import asyncio
import json
import threading
from collections import ChainMap, defaultdict, deque
from dataclasses import asdict, dataclass, replace
from os import getenv
from textwrap import dedent
from typing import (
    Any,
    AsyncIterator,
    Callable,
    Dict,
    Iterator,
    List,
    Literal,
    Optional,
    Sequence,
    Set,
    Tuple,
    Type,
    Union,
    cast,
    get_args,
    overload,
)
from uuid import uuid4

from pydantic import BaseModel

from agno.agent import Agent
from agno.agent.metrics import SessionMetrics
from agno.exceptions import ModelProviderError, RunCancelledException
from agno.knowledge.agent import AgentKnowledge
from agno.media import (
    Audio,
    AudioArtifact,
    AudioResponse,
    File,
    Image,
    ImageArtifact,
    Video,
    VideoArtifact,
)
from agno.memory.agent import AgentMemory
from agno.memory.team import TeamMemory, TeamRun
from agno.memory.v2.memory import Memory, SessionSummary
from agno.models.base import Model
from agno.models.message import Citations, Message, MessageReferences
from agno.models.response import ModelResponse, ModelResponseEvent, ToolExecution
from agno.reasoning.step import NextAction, ReasoningStep, ReasoningSteps
from agno.run.base import RunResponseExtraData, RunStatus
from agno.run.messages import RunMessages
from agno.run.response import RunResponse, RunResponseEvent
from agno.run.team import (
    TeamRunEvent,
    TeamRunResponse,
    TeamRunResponseEvent,
    ToolCallCompletedEvent,
)
from agno.storage.base import Storage
from agno.storage.session.team import TeamSession
from agno.tools.function import Function
from agno.tools.toolkit import Toolkit
from agno.utils.events import (
    create_team_memory_update_completed_event,
    create_team_memory_update_started_event,
    create_team_reasoning_completed_event,
    create_team_reasoning_started_event,
    create_team_reasoning_step_event,
    create_team_run_response_cancelled_event,
    create_team_run_response_completed_event,
    create_team_run_response_content_event,
    create_team_run_response_error_event,
    create_team_run_response_started_event,
    create_team_tool_call_completed_event,
    create_team_tool_call_started_event,
)
from agno.utils.log import (
    log_debug,
    log_error,
    log_exception,
    log_info,
    log_warning,
    set_log_level_to_debug,
    set_log_level_to_info,
    use_agent_logger,
    use_team_logger,
)
from agno.utils.merge_dict import merge_dictionaries
from agno.utils.message import get_text_from_message
from agno.utils.response import (
    check_if_run_cancelled,
    create_panel,
    escape_markdown_tags,
    format_tool_calls,
    update_run_response_with_reasoning,
)
from agno.utils.safe_formatter import SafeFormatter
from agno.utils.string import is_valid_uuid, parse_response_model_str, url_safe_string
from agno.utils.timer import Timer


@dataclass(init=False)
class Team:
    """
    A class representing a team of agents.
    """

    members: List[Union[Agent, "Team"]]

    mode: Literal["route", "coordinate", "collaborate"] = "coordinate"

    # Model for this Team
    model: Optional[Model] = None

    # --- Team settings ---
    # Name of the team
    name: Optional[str] = None
    # Team UUID (autogenerated if not set)
    team_id: Optional[str] = None
    # If this team is part of a team itself, this is the role of the team
    parent_team_id: Optional[str] = None
    # The workflow this team belongs to
    workflow_id: Optional[str] = None
    role: Optional[str] = None

    # --- User settings ---
    # ID of the user interacting with this team
    user_id: Optional[str] = None

    # --- Session settings ---
    # Team Session UUID (autogenerated if not set)
    session_id: Optional[str] = None
    # In the case where the team is a member of a team itself
    team_session_id: Optional[str] = None
    # Session name
    session_name: Optional[str] = None
    # Session state (stored in the database to persist across runs)
    session_state: Optional[Dict[str, Any]] = None

    # Team session state (shared between team leaders and team members)
    team_session_state: Optional[Dict[str, Any]] = None
    # If True, add the session state variables in the user and system messages
    add_state_in_messages: bool = False

    # --- System message settings ---
    # A description of the Team that is added to the start of the system message.
    description: Optional[str] = None
    # List of instructions for the team.
    instructions: Optional[Union[str, List[str], Callable]] = None
    # Provide the expected output from the Team.
    expected_output: Optional[str] = None
    # Additional context added to the end of the system message.
    additional_context: Optional[str] = None
    # If markdown=true, add instructions to format the output using markdown
    markdown: bool = False
    # If True, add the current datetime to the instructions to give the team a sense of time
    # This allows for relative times like "tomorrow" to be used in the prompt
    add_datetime_to_instructions: bool = False
    # If True, add the current location to the instructions to give the team a sense of location
    add_location_to_instructions: bool = False
    # If True, add the tools available to team members to the system message
    add_member_tools_to_system_message: bool = True

    # Provide the system message as a string or function
    system_message: Optional[Union[str, Callable, Message]] = None
    # Role for the system message
    system_message_role: str = "system"

    # --- Success criteria ---
    # Define the success criteria for the team
    success_criteria: Optional[str] = None

    # --- User provided context ---
    # User provided context
    context: Optional[Dict[str, Any]] = None
    # If True, add the context to the user prompt
    add_context: bool = False

    # --- Agent Knowledge ---
    knowledge: Optional[AgentKnowledge] = None
    # Add knowledge_filters to the Agent class attributes
    knowledge_filters: Optional[Dict[str, Any]] = None
    # Let the agent choose the knowledge filters
    enable_agentic_knowledge_filters: Optional[bool] = False

    # If True, add references to the user prompt
    add_references: bool = False
    # Retrieval function to get references
    # This function, if provided, is used instead of the default search_knowledge function
    # Signature:
    # def retriever(team: Team, query: str, num_documents: Optional[int], **kwargs) -> Optional[list[dict]]:
    #     ...
    retriever: Optional[Callable[..., Optional[List[Union[Dict, str]]]]] = None
    references_format: Literal["json", "yaml"] = "json"

    # --- Tools ---
    # If True, enable the team agent to update the team context and automatically send the team context to the members
    enable_agentic_context: bool = False
    # If True, send all previous member interactions to members
    share_member_interactions: bool = False
    # If True, add a tool to get information about the team members
    get_member_information_tool: bool = False
    # Add a tool to search the knowledge base (aka Agentic RAG)
    # Only added if knowledge is provided.
    search_knowledge: bool = True

    # If True, read the team history
    read_team_history: bool = False

    # --- Team Tools ---
    # A list of tools provided to the Model.
    # Tools are functions the model may generate JSON inputs for.
    tools: Optional[List[Union[Toolkit, Callable, Function, Dict]]] = None
    # Show tool calls in Team response. This sets the default for the team.
    show_tool_calls: bool = True
    # Controls which (if any) tool is called by the team model.
    # "none" means the model will not call a tool and instead generates a message.
    # "auto" means the model can pick between generating a message or calling a tool.
    # Specifying a particular function via {"type: "function", "function": {"name": "my_function"}}
    #   forces the model to call that tool.
    # "none" is the default when no tools are present. "auto" is the default if tools are present.
    tool_choice: Optional[Union[str, Dict[str, Any]]] = None
    # Maximum number of tool calls allowed.
    tool_call_limit: Optional[int] = None
    # A list of hooks to be called before and after the tool call
    tool_hooks: Optional[List[Callable]] = None

    # --- Structured output ---
    # Response model for the team response
    response_model: Optional[Type[BaseModel]] = None
    # If `response_model` is set, sets the response mode of the model, i.e. if the model should explicitly respond with a JSON object instead of a Pydantic model
    use_json_mode: bool = False
    # If True, parse the response
    parse_response: bool = True

    # --- History ---
    # Memory for the team
    memory: Optional[Union[TeamMemory, Memory]] = None
    # Enable the agent to manage memories of the user
    enable_agentic_memory: bool = False
    # If True, the agent creates/updates user memories at the end of runs
    enable_user_memories: bool = False
    # If True, the agent adds a reference to the user memories in the response
    add_memory_references: Optional[bool] = None
    # If True, the agent creates/updates session summaries at the end of runs
    enable_session_summaries: bool = False
    # If True, the agent adds a reference to the session summaries in the response
    add_session_summary_references: Optional[bool] = None

    # --- Team History ---
    # If True, enable the team history (Deprecated in favor of add_history_to_messages)
    enable_team_history: bool = False
    # add_history_to_messages=true adds messages from the chat history to the messages list sent to the Model.
    add_history_to_messages: bool = False
    # Deprecated in favor of num_history_runs: Number of interactions from history
    num_of_interactions_from_history: Optional[int] = None
    # Number of historical runs to include in the messages
    num_history_runs: int = 3

    # --- Team Storage ---
    storage: Optional[Storage] = None
    # Extra data stored with this team
    extra_data: Optional[Dict[str, Any]] = None

    # --- Team Reasoning ---
    reasoning: bool = False
    reasoning_model: Optional[Model] = None
    reasoning_agent: Optional[Agent] = None
    reasoning_min_steps: int = 1
    reasoning_max_steps: int = 10

    # --- Team Streaming ---
    # Stream the response from the Team
    stream: Optional[bool] = None
    # Stream the intermediate steps from the Team
    stream_intermediate_steps: bool = False

    # Optional app ID. Indicates this team is part of an app.
    app_id: Optional[str] = None
    # --- Debug & Monitoring ---
    # Enable debug logs
    debug_mode: bool = False
    # Enable member logs - Sets the debug_mode for team and members
    show_members_responses: bool = False
    # monitoring=True logs Team information to agno.com for monitoring
    monitoring: bool = False
    # telemetry=True logs minimal telemetry for analytics
    # This helps us improve the Teams implementation and provide better support
    telemetry: bool = True

    def __init__(
        self,
        members: List[Union[Agent, "Team"]],
        mode: Literal["route", "coordinate", "collaborate"] = "coordinate",
        model: Optional[Model] = None,
        name: Optional[str] = None,
        team_id: Optional[str] = None,
        user_id: Optional[str] = None,
        session_id: Optional[str] = None,
        session_name: Optional[str] = None,
        session_state: Optional[Dict[str, Any]] = None,
        team_session_state: Optional[Dict[str, Any]] = None,
        add_state_in_messages: bool = False,
        description: Optional[str] = None,
        instructions: Optional[Union[str, List[str], Callable]] = None,
        expected_output: Optional[str] = None,
        additional_context: Optional[str] = None,
        success_criteria: Optional[str] = None,
        markdown: bool = False,
        add_datetime_to_instructions: bool = False,
        add_location_to_instructions: bool = False,
        add_member_tools_to_system_message: bool = True,
        system_message: Optional[Union[str, Callable, Message]] = None,
        system_message_role: str = "system",
        context: Optional[Dict[str, Any]] = None,
        add_context: bool = False,
        knowledge: Optional[AgentKnowledge] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        add_references: bool = False,
        enable_agentic_knowledge_filters: Optional[bool] = False,
        retriever: Optional[Callable[..., Optional[List[Union[Dict, str]]]]] = None,
        references_format: Literal["json", "yaml"] = "json",
        enable_agentic_context: bool = False,
        share_member_interactions: bool = False,
        get_member_information_tool: bool = False,
        search_knowledge: bool = True,
        read_team_history: bool = False,
        tools: Optional[List[Union[Toolkit, Callable, Function, Dict]]] = None,
        show_tool_calls: bool = True,
        tool_call_limit: Optional[int] = None,
        tool_choice: Optional[Union[str, Dict[str, Any]]] = None,
        tool_hooks: Optional[List[Callable]] = None,
        response_model: Optional[Type[BaseModel]] = None,
        use_json_mode: bool = False,
        parse_response: bool = True,
        memory: Optional[Union[TeamMemory, Memory]] = None,
        enable_agentic_memory: bool = False,
        enable_user_memories: bool = False,
        add_memory_references: Optional[bool] = None,
        enable_session_summaries: bool = False,
        add_session_summary_references: Optional[bool] = None,
        enable_team_history: bool = False,
        add_history_to_messages: bool = False,
        num_of_interactions_from_history: Optional[int] = None,
        num_history_runs: int = 3,
        storage: Optional[Storage] = None,
        extra_data: Optional[Dict[str, Any]] = None,
        reasoning: bool = False,
        reasoning_model: Optional[Model] = None,
        reasoning_agent: Optional[Agent] = None,
        reasoning_min_steps: int = 1,
        reasoning_max_steps: int = 10,
        stream: Optional[bool] = None,
        stream_intermediate_steps: bool = False,
        debug_mode: bool = False,
        show_members_responses: bool = False,
        monitoring: bool = False,
        telemetry: bool = True,
    ):
        self.members = members

        self.mode = mode

        self.model = model

        self.name = name
        self.team_id = team_id

        self.user_id = user_id
        self.session_id = session_id
        self.session_name = session_name
        self.session_state = session_state
        self.team_session_state = team_session_state
        self.add_state_in_messages = add_state_in_messages

        self.description = description
        self.instructions = instructions
        self.expected_output = expected_output
        self.additional_context = additional_context
        self.markdown = markdown
        self.add_datetime_to_instructions = add_datetime_to_instructions
        self.add_location_to_instructions = add_location_to_instructions
        self.add_member_tools_to_system_message = add_member_tools_to_system_message
        self.system_message = system_message
        self.system_message_role = system_message_role
        self.success_criteria = success_criteria

        self.context = context
        self.add_context = add_context

        self.knowledge = knowledge
        self.knowledge_filters = knowledge_filters
        self.enable_agentic_knowledge_filters = enable_agentic_knowledge_filters
        self.add_references = add_references
        self.retriever = retriever
        self.references_format = references_format

        self.enable_agentic_context = enable_agentic_context
        self.share_member_interactions = share_member_interactions
        self.get_member_information_tool = get_member_information_tool
        self.search_knowledge = search_knowledge
        self.read_team_history = read_team_history

        self.tools = tools
        self.show_tool_calls = show_tool_calls
        self.tool_choice = tool_choice
        self.tool_call_limit = tool_call_limit
        self.tool_hooks = tool_hooks

        self.response_model = response_model
        self.use_json_mode = use_json_mode
        self.parse_response = parse_response

        self.memory = memory
        self.enable_agentic_memory = enable_agentic_memory
        self.enable_user_memories = enable_user_memories
        self.add_memory_references = add_memory_references
        self.enable_session_summaries = enable_session_summaries
        self.add_session_summary_references = add_session_summary_references

        self.enable_team_history = enable_team_history
        self.add_history_to_messages = add_history_to_messages
        self.num_of_interactions_from_history = num_of_interactions_from_history
        self.num_history_runs = num_history_runs

        self.storage = storage
        self.extra_data = extra_data

        self.reasoning = reasoning
        self.reasoning_model = reasoning_model
        self.reasoning_agent = reasoning_agent
        self.reasoning_min_steps = reasoning_min_steps
        self.reasoning_max_steps = reasoning_max_steps

        self.stream = stream
        self.stream_intermediate_steps = stream_intermediate_steps

        self.debug_mode = debug_mode
        self.show_members_responses = show_members_responses

        self.monitoring = monitoring
        self.telemetry = telemetry

        # --- Params not to be set by user ---
        self.session_metrics: Optional[SessionMetrics] = None
        self.full_team_session_metrics: Optional[SessionMetrics] = None

        self.run_id: Optional[str] = None
        self.run_input: Optional[Union[str, List, Dict]] = None
        self.run_messages: Optional[RunMessages] = None
        self.run_response: Optional[TeamRunResponse] = None

        # Images generated during this session
        self.images: Optional[List[ImageArtifact]] = None
        # Audio generated during this session
        self.audio: Optional[List[AudioArtifact]] = None
        # Videos generated during this session
        self.videos: Optional[List[VideoArtifact]] = None

        # Team session
        self.team_session: Optional[TeamSession] = None

        self._tool_instructions: Optional[List[str]] = None
        self._functions_for_model: Optional[Dict[str, Function]] = None
        self._tools_for_model: Optional[List[Dict[str, Any]]] = None

        # True if we should parse a member response model
        self._member_response_model: Optional[Type[BaseModel]] = None

        self._formatter: Optional[SafeFormatter] = None

    def _set_team_id(self) -> str:
        if self.team_id is None:
            self.team_id = str(uuid4())
        return self.team_id

    def _set_debug(self) -> None:
        if self.debug_mode or getenv("AGNO_DEBUG", "false").lower() == "true":
            self.debug_mode = True
            set_log_level_to_debug(source_type="team")
        else:
            set_log_level_to_info(source_type="team")

    def _set_storage_mode(self) -> None:
        if self.storage is not None:
            self.storage.mode = "team"

    def _set_monitoring(self) -> None:
        """Override monitoring and telemetry settings based on environment variables."""

        # Only override if the environment variable is set
        monitor_env = getenv("AGNO_MONITOR")
        if monitor_env is not None:
            self.monitoring = monitor_env.lower() == "true"

        telemetry_env = getenv("AGNO_TELEMETRY")
        if telemetry_env is not None:
            self.telemetry = telemetry_env.lower() == "true"

    def _initialize_member(
        self, member: Union["Team", Agent], session_id: Optional[str] = None
    ) -> None:
        # Set debug mode for all members
        if self.debug_mode:
            member.debug_mode = True
        if self.show_tool_calls:
            member.show_tool_calls = True
        if self.markdown:
            member.markdown = True

        if session_id is not None:
            member.team_session_id = session_id

        # Set the team session state on members
        if self.team_session_state is not None:
            if member.team_session_state is None:
                member.team_session_state = self.team_session_state
            else:
                merge_dictionaries(member.team_session_state, self.team_session_state)

        if isinstance(member, Agent):
            member.team_id = self.team_id
            member.set_agent_id()
        elif isinstance(member, Team):
            if member.team_id is None:
                member.team_id = str(uuid4())
            member.parent_team_id = self.team_id
            for sub_member in member.members:
                self._initialize_member(sub_member, session_id)
        if member.name is None:
            log_warning("Team member name is undefined.")

    def _set_default_model(self) -> None:
        # Set the default model
        if self.model is None:
            try:
                from agno.models.openai import OpenAIChat
            except ModuleNotFoundError as e:
                log_exception(e)
                log_error(
                    "Agno agents use `openai` as the default model provider. "
                    "Please provide a `model` or install `openai`."
                )
                exit(1)

            log_info("Setting default model to OpenAI Chat")
            self.model = OpenAIChat(id="gpt-4o")

    def _set_defaults(self) -> None:
        if self.add_memory_references is None:
            self.add_memory_references = (
                self.enable_user_memories or self.enable_agentic_memory
            )

        if self.add_session_summary_references is None:
            self.add_session_summary_references = self.enable_session_summaries

        if self.num_of_interactions_from_history is not None:
            self.num_history_runs = self.num_of_interactions_from_history

    def _reset_session_state(self) -> None:
        self.session_name = None
        self.session_state = None
        self.team_session_state = None
        self.session_metrics = None
        self.images = None
        self.videos = None
        self.audio = None
        self.files = None
        self.team_session = None

    def _reset_run_state(self) -> None:
        self.run_id = None
        self.run_input = None
        self.run_messages = None
        self.run_response = None

    def initialize_team(self, session_id: Optional[str] = None) -> None:
        self._set_defaults()
        self._set_default_model()
        self._set_storage_mode()

        # Set debug mode
        self._set_debug()

        # Set monitoring and telemetry
        self._set_monitoring()

        # Set the team ID if not set
        self._set_team_id()

        log_debug(f"Team ID: {self.team_id}", center=True)

        # Initialize memory if not yet set
        if self.memory is None:
            self.memory = Memory()

        # Default to the team's model if no model is provided
        if isinstance(self.memory, Memory):
            if self.memory.model is None and self.model is not None:
                self.memory.set_model(self.model)

        # Initialize formatter
        if self._formatter is None:
            self._formatter = SafeFormatter()

        for member in self.members:
            self._initialize_member(member, session_id=session_id)

        # Make sure for the team, we are using the team logger
        use_team_logger()

    @property
    def is_streamable(self) -> bool:
        return self.response_model is None

    @overload
    def run(
        self,
        message: Union[str, List, Dict, Message],
        *,
        stream: Literal[False] = False,
        stream_intermediate_steps: Optional[bool] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        retries: Optional[int] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> TeamRunResponse: ...

    @overload
    def run(
        self,
        message: Union[str, List, Dict, Message],
        *,
        stream: Literal[True] = True,
        stream_intermediate_steps: Optional[bool] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        retries: Optional[int] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Iterator[Union[RunResponseEvent, TeamRunResponseEvent]]: ...

    def run(
        self,
        message: Union[str, List, Dict, Message],
        *,
        stream: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        retries: Optional[int] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Union[
        TeamRunResponse, Iterator[Union[RunResponseEvent, TeamRunResponseEvent]]
    ]:
        """Run the Team and return the response."""

        self._reset_run_state()

        if session_id is not None:
            # Reset session state if a session_id is provided. Session name and session state will be loaded from storage.
            self._reset_session_state()

        # Use the default user_id and session_id when necessary
        if user_id is None:
            user_id = self.user_id

        if session_id is None or session_id == "":
            # Default to the team's session_id if no session_id is provided
            if not (self.session_id is None or self.session_id == ""):
                session_id = self.session_id
            else:
                # Generate a new session_id and store it in the agent
                session_id = str(uuid4())
                self.session_id = session_id

        session_id = cast(str, session_id)

        self._initialize_session_state(user_id=user_id, session_id=session_id)

        log_debug(f"Session ID: {session_id}", center=True)

        # Initialize Team
        self.initialize_team(session_id=session_id)

        # Initialize Knowledge Filters
        effective_filters = knowledge_filters

        # When filters are passed manually
        if self.knowledge_filters or knowledge_filters:
            """
                initialize metadata (specially required in case when load is commented out)
                when load is not called the reader's document_lists won't be called and metadata filters won't be initialized
                so we need to call initialize_valid_filters to make sure the filters are initialized
            """
            if not self.knowledge.valid_metadata_filters:  # type: ignore
                self.knowledge.initialize_valid_filters()  # type: ignore

            effective_filters = self._get_team_effective_filters(knowledge_filters)

        # Agentic filters are enabled
        if (
            self.enable_agentic_knowledge_filters
            and not self.knowledge.valid_metadata_filters
        ):  # type: ignore
            # initialize metadata (specially required in case when load is commented out)
            self.knowledge.initialize_valid_filters()  # type: ignore

        # Use stream override value when necessary
        if stream is None:
            stream = False if self.stream is None else self.stream

        if stream_intermediate_steps is None:
            stream_intermediate_steps = (
                False
                if self.stream_intermediate_steps is None
                else self.stream_intermediate_steps
            )

        # Can't have stream_intermediate_steps if stream is False
        if stream is False:
            stream_intermediate_steps = False

        self.stream = self.stream or (stream and self.is_streamable)
        self.stream_intermediate_steps = self.stream_intermediate_steps or (
            stream_intermediate_steps and self.stream
        )

        # Read existing session from storage
        self.read_from_storage(session_id=session_id)

        # Read existing session from storage
        if self.context is not None:
            self._resolve_run_context()

        if self.response_model is not None and self.parse_response and stream is True:
            # Disable stream if response_model is set
            stream = False
            log_debug("Disabling stream as response_model is set")

        # Configure the model for runs
        self._set_default_model()
        response_format: Optional[Union[Dict, Type[BaseModel]]] = (
            self._get_response_format()
        )

        self.model = cast(Model, self.model)
        self.determine_tools_for_model(
            model=self.model,
            session_id=session_id,
            user_id=user_id,
            async_mode=False,
            knowledge_filters=effective_filters,
            message=message,
            images=images,
            videos=videos,
            audio=audio,
            files=files,
        )

        # Register the team on the platform
        thread = threading.Thread(target=self.register_team)
        thread.start()

        # Create a run_id for this specific run
        run_id = str(uuid4())

        # Create a new run_response for this attempt
        run_response = TeamRunResponse(
            run_id=run_id, session_id=session_id, team_id=self.team_id
        )

        run_response.model = self.model.id if self.model is not None else None
        run_response.model_provider = (
            self.model.provider if self.model is not None else None
        )

        self.run_response = run_response
        self.run_id = run_id

        retries = retries or 3

        # Run the team
        last_exception = None
        num_attempts = retries + 1

        for attempt in range(num_attempts):
            # Initialize the current run

            log_debug(f"Team Run Start: {self.run_id}", center=True)
            log_debug(f"Mode: '{self.mode}'", center=True)

            # Set run_input
            if message is not None:
                if isinstance(message, str):
                    self.run_input = message
                elif isinstance(message, Message):
                    self.run_input = message.to_dict()
                else:
                    self.run_input = message

            # Run the team
            try:
                # Prepare run messages
                if self.mode == "route":
                    run_messages: RunMessages = self.get_run_messages(
                        session_id=session_id,
                        user_id=user_id,
                        message=message,
                        audio=audio,
                        images=images,
                        videos=videos,
                        files=files,
                        knowledge_filters=effective_filters,
                        **kwargs,
                    )
                else:
                    run_messages = self.get_run_messages(
                        session_id=session_id,
                        user_id=user_id,
                        message=message,
                        audio=audio,
                        images=images,
                        videos=videos,
                        files=files,
                        knowledge_filters=effective_filters,
                        **kwargs,
                    )
                self.run_messages = run_messages
                if len(run_messages.messages) == 0:
                    log_error("No messages to be sent to the model.")

                if stream and self.is_streamable:
                    response_iterator = self._run_stream(
                        run_response=self.run_response,
                        run_messages=run_messages,
                        stream_intermediate_steps=stream_intermediate_steps,
                        session_id=session_id,
                        user_id=user_id,
                        response_format=response_format,
                    )

                    return response_iterator
                else:
                    return self._run(
                        run_response=self.run_response,
                        run_messages=run_messages,
                        session_id=session_id,
                        user_id=user_id,
                        response_format=response_format,
                    )

            except ModelProviderError as e:
                import time

                log_warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")

                last_exception = e
                if attempt < num_attempts - 1:
                    time.sleep(2**attempt)
            except (KeyboardInterrupt, RunCancelledException):
                if stream and self.is_streamable:
                    return self._generator_wrapper(
                        create_team_run_response_cancelled_event(
                            run_response, "Operation cancelled by user"
                        )
                    )
                else:
                    return self._create_run_response(
                        run_state=RunStatus.cancelled,
                        content="Operation cancelled by user",
                        from_run_response=run_response,
                        session_id=session_id,
                    )

        # If we get here, all retries failed
        if last_exception is not None:
            log_error(
                f"Failed after {num_attempts} attempts. Last error using {last_exception.model_name}({last_exception.model_id})"
            )
            if stream and self.is_streamable:
                return self._generator_wrapper(
                    create_team_run_response_error_event(
                        run_response, error=str(last_exception)
                    )
                )

            raise last_exception
        else:
            if stream and self.is_streamable:
                return self._generator_wrapper(
                    create_team_run_response_error_event(
                        run_response, error=str(last_exception)
                    )
                )

            raise Exception(f"Failed after {num_attempts} attempts.")

    def _run(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
    ) -> TeamRunResponse:
        """Run the Team and return the response.

        Steps:
        1. Reason about the task(s) if reasoning is enabled
        2. Get a response from the model
        3. Update Team Memory
        5. Save session to storage
        6. Parse any structured outputs
        7. Log the team run
        """
        # 1. Reason about the task(s) if reasoning is enabled
        self._handle_reasoning(run_response=run_response, run_messages=run_messages)

        # Update agent state
        index_of_last_user_message = len(run_messages.messages)

        # 2. Get the model response for the team leader
        self.model = cast(Model, self.model)
        model_response: ModelResponse = self.model.response(
            messages=run_messages.messages,
            response_format=response_format,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
        )

        #  Update TeamRunResponse
        self._update_run_response(
            model_response=model_response,
            run_response=run_response,
            run_messages=run_messages,
        )

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # 4. Update Team Memory
        response_iterator = self._update_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
        )
        deque(response_iterator, maxlen=0)

        # 5. Save session to storage
        self.write_to_storage(session_id=session_id, user_id=user_id)

        # 6. Parse team response model
        self._convert_response_to_structured_format(run_response=run_response)

        # 8. Log Team Run
        self._log_team_run(session_id=session_id, user_id=user_id)

        log_debug(f"Team Run End: {self.run_id}", center=True, symbol="*")

        return run_response

    def _run_stream(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
    ) -> Iterator[Union[TeamRunResponseEvent, RunResponseEvent]]:
        """Run the Team and return the response iterator.

        Steps:
        1. Reason about the task(s) if reasoning is enabled
        2. Get a response from the model
        3. Update Team Memory
        4. Save session to storage
        5. Log Team Run
        """

        # 1. Reason about the task(s) if reasoning is enabled
        yield from self._handle_reasoning_stream(
            run_response=run_response,
            run_messages=run_messages,
        )

        # Update agent state
        index_of_last_user_message = len(run_messages.messages)

        # Start the Run by yielding a RunStarted event
        if stream_intermediate_steps:
            yield create_team_run_response_started_event(run_response)

        # 2. Get a response from the model
        yield from self._handle_model_response_stream(
            run_response=run_response,
            run_messages=run_messages,
            response_format=response_format,
            stream_intermediate_steps=stream_intermediate_steps,
        )

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # 4. Update Team Memory
        yield from self._update_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
        )

        # 4. Save session to storage
        self.write_to_storage(session_id=session_id, user_id=user_id)

        # 5. Log Team Run
        self._log_team_run(session_id=session_id, user_id=user_id)

        if stream_intermediate_steps:
            yield create_team_run_response_completed_event(
                from_run_response=run_response,
            )

        log_debug(f"Team Run End: {self.run_id}", center=True, symbol="*")

    @overload
    async def arun(
        self,
        message: Union[str, List, Dict, Message],
        *,
        stream: Literal[False] = False,
        stream_intermediate_steps: Optional[bool] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        retries: Optional[int] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> TeamRunResponse: ...

    @overload
    async def arun(
        self,
        message: Union[str, List, Dict, Message],
        *,
        stream: Literal[True] = True,
        stream_intermediate_steps: Optional[bool] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        retries: Optional[int] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> AsyncIterator[Union[RunResponseEvent, TeamRunResponseEvent]]: ...

    async def arun(
        self,
        message: Union[str, List, Dict, Message],
        *,
        stream: Optional[bool] = None,
        stream_intermediate_steps: Optional[bool] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        retries: Optional[int] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> Union[
        TeamRunResponse, AsyncIterator[Union[RunResponseEvent, TeamRunResponseEvent]]
    ]:
        """Run the Team asynchronously and return the response."""

        self._reset_run_state()

        if session_id is not None:
            # Reset session state if a session_id is provided. Session name and session state will be loaded from storage.
            self._reset_session_state()

        # Use the default user_id and session_id when necessary
        if user_id is None:
            user_id = self.user_id

        if session_id is None or session_id == "":
            # Default to the team's session_id if no session_id is provided
            if not (self.session_id is None or self.session_id == ""):
                session_id = self.session_id
            else:
                # Generate a new session_id and store it in the team
                session_id = str(uuid4())
                self.session_id = session_id

        session_id = cast(str, session_id)

        self._initialize_session_state(user_id=user_id, session_id=session_id)

        log_debug(f"Session ID: {session_id}", center=True)

        self.initialize_team(session_id=session_id)

        effective_filters = knowledge_filters

        # When filters are passed manually
        if self.knowledge_filters or knowledge_filters:
            """
                initialize metadata (specially required in case when load is commented out)
                when load is not called the reader's document_lists won't be called and metadata filters won't be initialized
                so we need to call initialize_valid_filters to make sure the filters are initialized
            """
            if not self.knowledge.valid_metadata_filters:  # type: ignore
                self.knowledge.initialize_valid_filters()  # type: ignore

            effective_filters = self._get_team_effective_filters(knowledge_filters)

        # Use stream override value when necessary
        if stream is None:
            stream = False if self.stream is None else self.stream

        if stream_intermediate_steps is None:
            stream_intermediate_steps = (
                False
                if self.stream_intermediate_steps is None
                else self.stream_intermediate_steps
            )

        # Can't have stream_intermediate_steps if stream is False
        if stream is False:
            stream_intermediate_steps = False

        self.stream = self.stream or (stream and self.is_streamable)
        self.stream_intermediate_steps = self.stream_intermediate_steps or (
            stream_intermediate_steps and self.stream
        )

        # Read existing session from storage
        self.read_from_storage(session_id=session_id)

        # Read existing session from storage
        if self.context is not None:
            self._resolve_run_context()

        if self.response_model is not None and self.parse_response and stream is True:
            # Disable stream if response_model is set
            stream = False
            log_debug("Disabling stream as response_model is set")

        # Configure the model for runs
        self._set_default_model()
        response_format = self._get_response_format()

        self.model = cast(Model, self.model)
        self.determine_tools_for_model(
            model=self.model,
            session_id=session_id,
            user_id=user_id,
            async_mode=True,
            knowledge_filters=effective_filters,
            message=message,
            images=images,
            videos=videos,
            audio=audio,
            files=files,
        )

        asyncio.create_task(self._aregister_team())

        # Create a run_id for this specific run
        run_id = str(uuid4())

        # Create a new run_response for this attempt
        run_response = TeamRunResponse(
            run_id=run_id, session_id=session_id, team_id=self.team_id
        )

        run_response.model = self.model.id if self.model is not None else None
        run_response.model_provider = (
            self.model.provider if self.model is not None else None
        )

        self.run_response = run_response
        self.run_id = run_id

        retries = retries or 3

        # Run the team
        last_exception = None
        num_attempts = retries + 1

        for attempt in range(num_attempts):
            log_debug(f"Team Run Start: {self.run_id}", center=True)
            log_debug(f"Mode: '{self.mode}'", center=True)

            # Set run_input
            if message is not None:
                if isinstance(message, str):
                    self.run_input = message
                elif isinstance(message, Message):
                    self.run_input = message.to_dict()
                else:
                    self.run_input = message

            # Run the team
            try:
                # Prepare run messages
                if self.mode == "route":
                    # In route mode the model shouldn't get images/audio/video
                    run_messages: RunMessages = self.get_run_messages(
                        session_id=session_id,
                        user_id=user_id,
                        message=message,
                        audio=audio,
                        images=images,
                        videos=videos,
                        files=files,
                        knowledge_filters=effective_filters,
                        **kwargs,
                    )
                else:
                    run_messages = self.get_run_messages(
                        session_id=session_id,
                        user_id=user_id,
                        message=message,
                        audio=audio,
                        images=images,
                        videos=videos,
                        files=files,
                        knowledge_filters=effective_filters,
                        **kwargs,
                    )

                if stream:
                    response_iterator = self._arun_stream(
                        run_response=self.run_response,
                        run_messages=run_messages,
                        session_id=session_id,
                        user_id=user_id,
                        response_format=response_format,
                        stream_intermediate_steps=stream_intermediate_steps,
                    )
                    return response_iterator
                else:
                    return await self._arun(
                        run_response=self.run_response,
                        run_messages=run_messages,
                        session_id=session_id,
                        user_id=user_id,
                        response_format=response_format,
                    )

            except ModelProviderError as e:
                log_warning(f"Attempt {attempt + 1}/{num_attempts} failed: {str(e)}")
                last_exception = e
                if attempt < num_attempts - 1:
                    await asyncio.sleep(2**attempt)
            except (KeyboardInterrupt, RunCancelledException):
                if stream and self.is_streamable:
                    return self._async_generator_wrapper(
                        create_team_run_response_cancelled_event(
                            run_response, "Operation cancelled by user"
                        )
                    )
                else:
                    return self._create_run_response(
                        run_state=RunStatus.cancelled,
                        content="Operation cancelled by user",
                        from_run_response=run_response,
                        session_id=session_id,
                    )

        # If we get here, all retries failed
        if last_exception is not None:
            log_error(
                f"Failed after {num_attempts} attempts. Last error using {last_exception.model_name}({last_exception.model_id})"
            )
            if stream and self.is_streamable:
                return self._async_generator_wrapper(
                    create_team_run_response_error_event(
                        run_response, error=str(last_exception)
                    )
                )

            raise last_exception
        else:
            if stream and self.is_streamable:
                return self._async_generator_wrapper(
                    create_team_run_response_error_event(
                        run_response, error=str(last_exception)
                    )
                )

            raise Exception(f"Failed after {num_attempts} attempts.")

    async def _arun(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
    ) -> TeamRunResponse:
        """Run the Team and return the response.

        Steps:
        1. Reason about the task(s) if reasoning is enabled
        2. Get a response from the model
        3. Update Team Memory
        5. Save session to storage
        6. Parse any structured outputs
        7. Log the team run
        """

        self.model = cast(Model, self.model)

        # 1. Reason about the task(s) if reasoning is enabled
        await self._ahandle_reasoning(
            run_response=run_response, run_messages=run_messages
        )

        # Update agent state
        index_of_last_user_message = len(run_messages.messages)

        # 2. Get the model response for the team leader
        model_response = await self.model.aresponse(
            messages=run_messages.messages,
            response_format=response_format,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
        )  # type: ignore

        # Update TeamRunResponse
        self._update_run_response(
            model_response=model_response,
            run_response=run_response,
            run_messages=run_messages,
        )

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )
        # 4. Update Team Memory
        async for _ in self._aupdate_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
        ):
            pass

        # 5. Save session to storage
        self.write_to_storage(session_id=session_id, user_id=user_id)

        # 6. Parse team response model
        self._convert_response_to_structured_format(run_response=run_response)

        # 7. Log Team Run
        await self._alog_team_run(session_id=session_id, user_id=user_id)

        log_debug(f"Team Run End: {self.run_id}", center=True, symbol="*")

        return run_response

    async def _arun_stream(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
    ) -> AsyncIterator[Union[TeamRunResponseEvent, RunResponseEvent]]:
        """Run the Team and return the response.

        Steps:
        1. Reason about the task(s) if reasoning is enabled
        2. Get a response from the model
        3. Update Team Memory
        4. Save session to storage
        5. Log Team Run
        """

        # 1. Reason about the task(s) if reasoning is enabled
        async for item in self._ahandle_reasoning_stream(
            run_response=run_response, run_messages=run_messages
        ):
            yield item

        # Update agent state
        index_of_last_user_message = len(run_messages.messages)

        # Start the Run by yielding a RunStarted event
        if stream_intermediate_steps:
            yield create_team_run_response_started_event(from_run_response=run_response)

        # 2. Get a response from the model
        async for event in self._ahandle_model_response_stream(
            run_response=run_response,
            run_messages=run_messages,
            response_format=response_format,
            stream_intermediate_steps=stream_intermediate_steps,
        ):
            yield event

        # 3. Add the run to memory
        self._add_run_to_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            index_of_last_user_message=index_of_last_user_message,
        )

        # 4. Update Team Memory
        async for event in self._aupdate_memory(
            run_response=run_response,
            run_messages=run_messages,
            session_id=session_id,
            user_id=user_id,
        ):
            yield event

        # 5. Save session to storage
        self.write_to_storage(session_id=session_id, user_id=user_id)

        # 6. Log Team Run
        await self._alog_team_run(session_id=session_id, user_id=user_id)

        if stream_intermediate_steps:
            yield create_team_run_response_completed_event(
                from_run_response=run_response
            )

        log_debug(f"Team Run End: {self.run_id}", center=True, symbol="*")

    def _update_run_response(
        self,
        model_response: ModelResponse,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
    ):
        # Handle structured outputs
        if (
            (self.response_model is not None)
            and not self.use_json_mode
            and (model_response.parsed is not None)
        ):
            # Update the run_response content with the structured output
            run_response.content = model_response.parsed
            # Update the run_response content_type with the structured output class name
            run_response.content_type = self.response_model.__name__
        else:
            # Update the run_response content with the model response content
            if not run_response.content:
                run_response.content = model_response.content
            else:
                run_response.content += model_response.content

        # Update the run_response thinking with the model response thinking
        if model_response.thinking is not None:
            if not run_response.thinking:
                run_response.thinking = model_response.thinking
            else:
                run_response.thinking += model_response.thinking

        # Update citations
        if model_response.citations is not None:
            run_response.citations = model_response.citations

        # Update the run_response tools with the model response tool_executions
        if model_response.tool_executions is not None:
            if run_response.tools is None:
                run_response.tools = model_response.tool_executions
            else:
                run_response.tools.extend(model_response.tool_executions)

        run_response.formatted_tool_calls = format_tool_calls(run_response.tools or [])

        # Update the run_response audio with the model response audio
        if model_response.audio is not None:
            run_response.response_audio = model_response.audio

        # Update the run_response created_at with the model response created_at
        run_response.created_at = model_response.created_at

        # Build a list of messages that should be added to the RunResponse
        messages_for_run_response = [
            m for m in run_messages.messages if m.add_to_agent_memory
        ]

        # Update the TeamRunResponse messages
        run_response.messages = messages_for_run_response

        # Update the TeamRunResponse metrics
        run_response.metrics = self._aggregate_metrics_from_messages(
            messages_for_run_response
        )

        for tool_call in model_response.tool_calls:
            tool_name = tool_call.get("tool_name", "")
            if tool_name.lower() in ["think", "analyze"]:
                tool_args = tool_call.get("tool_args", {})
                self.update_reasoning_content_from_tool_call(
                    run_response, tool_name, tool_args
                )

    def _add_run_to_memory(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        session_id: str,
        index_of_last_user_message: int = 0,
    ):
        if isinstance(self.memory, TeamMemory):
            self.memory = cast(TeamMemory, self.memory)
        else:
            self.memory = cast(Memory, self.memory)

        if isinstance(self.memory, TeamMemory):
            # Add the system message to the memory
            if run_messages.system_message is not None:
                self.memory.add_system_message(
                    run_messages.system_message,
                    system_message_role=self.system_message_role,
                )

            # Build a list of messages that should be added to the AgentMemory
            messages_for_memory: List[Message] = (
                [run_messages.user_message]
                if run_messages.user_message is not None
                else []
            )
            # Add messages from messages_for_run after the last user message
            for _rm in run_messages.messages[index_of_last_user_message:]:
                if _rm.add_to_agent_memory:
                    messages_for_memory.append(_rm)
            if len(messages_for_memory) > 0:
                self.memory.add_messages(messages=messages_for_memory)

            team_run = TeamRun(response=run_response)
            team_run.message = run_messages.user_message

            # Update the memories with the user message if needed
            if (
                self.memory is not None
                and self.memory.create_user_memories
                and self.memory.update_user_memories_after_run
                and run_messages.user_message is not None
            ):
                self.memory.update_memory(
                    input=run_messages.user_message.get_content_string()
                )  # type: ignore

            # Add AgentRun to memory
            self.memory.add_team_run(team_run)  # type: ignore

        elif isinstance(self.memory, Memory):
            # Add AgentRun to memory
            self.memory.add_run(session_id=session_id, run=run_response)

    def _update_memory(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
    ) -> Iterator[TeamRunResponseEvent]:
        if isinstance(self.memory, TeamMemory):
            # Update the memories with the user message if needed
            if (
                self.memory is not None
                and self.memory.create_user_memories
                and self.memory.update_user_memories_after_run
                and run_messages.user_message is not None
            ):
                if self.stream_intermediate_steps:
                    yield create_team_memory_update_started_event(
                        from_run_response=run_response
                    )
                self.memory.update_memory(
                    input=run_messages.user_message.get_content_string()
                )  # type: ignore

                if self.stream_intermediate_steps:
                    yield create_team_memory_update_completed_event(
                        from_run_response=run_response
                    )

            # Add AgentRun to memory
            self.session_metrics = self._calculate_session_metrics(self.memory.messages)
            self.full_team_session_metrics = self._calculate_full_team_session_metrics(
                self.memory.messages
            )
        elif isinstance(self.memory, Memory):
            yield from self._make_memories_and_summaries(
                run_messages, session_id, user_id
            )

            session_messages: List[Message] = []
            for run in self.memory.runs.get(session_id, []):  # type: ignore
                if run.messages is not None:
                    for m in run.messages:
                        session_messages.append(m)

            # 10. Calculate session metrics
            self.session_metrics = self._calculate_session_metrics(session_messages)

    async def _aupdate_memory(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        session_id: str,
        user_id: Optional[str] = None,
    ):
        if isinstance(self.memory, TeamMemory):
            # Update the memories with the user message if needed
            if (
                self.memory is not None
                and self.memory.create_user_memories
                and self.memory.update_user_memories_after_run
                and run_messages.user_message is not None
            ):
                if self.stream_intermediate_steps:
                    yield create_team_memory_update_started_event(
                        from_run_response=run_response
                    )

                await self.memory.aupdate_memory(
                    input=run_messages.user_message.get_content_string()
                )

                if self.stream_intermediate_steps:
                    yield create_team_memory_update_completed_event(
                        from_run_response=run_response
                    )

            # Calculate session metrics
            self.session_metrics = self._calculate_session_metrics(self.memory.messages)
            self.full_team_session_metrics = self._calculate_full_team_session_metrics(
                self.memory.messages
            )

        elif isinstance(self.memory, Memory):
            async for event in self._amake_memories_and_summaries(
                run_messages, session_id, user_id
            ):
                yield event

            session_messages: List[Message] = []
            if self.memory.runs:
                for run in self.memory.runs.get(session_id, []):
                    if run.messages is not None:
                        for m in run.messages:
                            session_messages.append(m)

            # 10. Calculate session metrics
            self.session_metrics = self._calculate_session_metrics(session_messages)

    def _handle_model_response_stream(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
    ) -> Iterator[Union[TeamRunResponseEvent, RunResponseEvent]]:
        self.model = cast(Model, self.model)

        reasoning_state = {
            "reasoning_started": False,
            "reasoning_time_taken": 0.0,
        }

        full_model_response = ModelResponse()
        for model_response_event in self.model.response_stream(
            messages=run_messages.messages,
            response_format=response_format,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
        ):
            yield from self._handle_model_response_chunk(
                run_response=run_response,
                full_model_response=full_model_response,
                model_response_event=model_response_event,
                stream_intermediate_steps=stream_intermediate_steps,
                reasoning_state=reasoning_state,
            )

        # 3. Update TeamRunResponse
        run_response.created_at = full_model_response.created_at
        if full_model_response.content is not None:
            run_response.content = full_model_response.content
        if full_model_response.thinking is not None:
            run_response.thinking = full_model_response.thinking
        if full_model_response.audio is not None:
            run_response.response_audio = full_model_response.audio
        if full_model_response.citations is not None:
            run_response.citations = full_model_response.citations

        if stream_intermediate_steps and reasoning_state["reasoning_started"]:
            all_reasoning_steps: List[ReasoningStep] = []
            if (
                self.run_response
                and self.run_response.extra_data
                and hasattr(self.run_response.extra_data, "reasoning_steps")
            ):
                all_reasoning_steps = cast(
                    List[ReasoningStep], self.run_response.extra_data.reasoning_steps
                )

            if all_reasoning_steps:
                self._add_reasoning_metrics_to_extra_data(
                    run_response, reasoning_state["reasoning_time_taken"]
                )
                yield create_team_reasoning_completed_event(
                    from_run_response=run_response,
                    content=ReasoningSteps(reasoning_steps=all_reasoning_steps),
                    content_type=ReasoningSteps.__name__,
                )

        # Build a list of messages that should be added to the RunResponse
        messages_for_run_response = [
            m for m in run_messages.messages if m.add_to_agent_memory
        ]
        # Update the TeamRunResponse messages
        run_response.messages = messages_for_run_response
        # Update the TeamRunResponse metrics
        run_response.metrics = self._aggregate_metrics_from_messages(
            messages_for_run_response
        )

        # Update the run_response audio if streaming
        if full_model_response.audio is not None:
            run_response.response_audio = full_model_response.audio

    async def _ahandle_model_response_stream(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
        response_format: Optional[Union[Dict, Type[BaseModel]]] = None,
        stream_intermediate_steps: bool = False,
    ) -> AsyncIterator[Union[TeamRunResponseEvent, RunResponseEvent]]:
        self.model = cast(Model, self.model)

        reasoning_state = {
            "reasoning_started": False,
            "reasoning_time_taken": 0.0,
        }
        full_model_response = ModelResponse()
        model_stream = self.model.aresponse_stream(
            messages=run_messages.messages,
            response_format=response_format,
            tools=self._tools_for_model,
            functions=self._functions_for_model,
            tool_choice=self.tool_choice,
            tool_call_limit=self.tool_call_limit,
        )  # type: ignore
        async for model_response_event in model_stream:
            for chunk in self._handle_model_response_chunk(
                run_response=run_response,
                full_model_response=full_model_response,
                model_response_event=model_response_event,
                stream_intermediate_steps=stream_intermediate_steps,
                reasoning_state=reasoning_state,
            ):
                yield chunk

        # Handle structured outputs
        if (
            (self.response_model is not None)
            and not self.use_json_mode
            and (full_model_response.parsed is not None)
        ):
            # Update the run_response content with the structured output
            run_response.content = full_model_response.parsed

        # Update TeamRunResponse
        run_response.created_at = full_model_response.created_at
        if full_model_response.content is not None:
            run_response.content = full_model_response.content
        if full_model_response.thinking is not None:
            run_response.thinking = full_model_response.thinking
        if full_model_response.audio is not None:
            run_response.response_audio = full_model_response.audio
        if full_model_response.citations is not None:
            run_response.citations = full_model_response.citations

        # Build a list of messages that should be added to the RunResponse
        messages_for_run_response = [
            m for m in run_messages.messages if m.add_to_agent_memory
        ]
        # Update the TeamRunResponse messages
        run_response.messages = messages_for_run_response
        # Update the TeamRunResponse metrics
        run_response.metrics = self._aggregate_metrics_from_messages(
            messages_for_run_response
        )

        if stream_intermediate_steps and reasoning_state["reasoning_started"]:
            all_reasoning_steps: List[ReasoningStep] = []
            if (
                self.run_response
                and self.run_response.extra_data
                and hasattr(self.run_response.extra_data, "reasoning_steps")
            ):
                all_reasoning_steps = cast(
                    List[ReasoningStep], self.run_response.extra_data.reasoning_steps
                )

            if all_reasoning_steps:
                self._add_reasoning_metrics_to_extra_data(
                    run_response, reasoning_state["reasoning_time_taken"]
                )
                yield create_team_reasoning_completed_event(
                    from_run_response=run_response,
                    content=ReasoningSteps(reasoning_steps=all_reasoning_steps),
                    content_type=ReasoningSteps.__name__,
                )

    def _handle_model_response_chunk(
        self,
        run_response: TeamRunResponse,
        full_model_response: ModelResponse,
        model_response_event: Union[
            ModelResponse, TeamRunResponseEvent, RunResponseEvent
        ],
        reasoning_state: Dict[str, Any],
        stream_intermediate_steps: bool = False,
    ) -> Iterator[Union[TeamRunResponseEvent, RunResponseEvent]]:
        if isinstance(
            model_response_event, tuple(get_args(RunResponseEvent))
        ) or isinstance(model_response_event, tuple(get_args(TeamRunResponseEvent))):
            # We just bubble the event up
            yield model_response_event  # type: ignore
        else:
            model_response_event = cast(ModelResponse, model_response_event)
            # If the model response is an assistant_response, yield a RunResponse
            if (
                model_response_event.event
                == ModelResponseEvent.assistant_response.value
            ):
                should_yield = False
                # Process content and thinking
                if model_response_event.content is not None:
                    if not full_model_response.content:
                        full_model_response.content = model_response_event.content
                    else:
                        full_model_response.content += model_response_event.content
                    should_yield = True

                # Process thinking
                if model_response_event.thinking is not None:
                    if not full_model_response.thinking:
                        full_model_response.thinking = model_response_event.thinking
                    else:
                        full_model_response.thinking += model_response_event.thinking
                    should_yield = True

                if model_response_event.citations is not None:
                    # We get citations in one chunk
                    full_model_response.citations = model_response_event.citations
                    should_yield = True

                # Process audio
                if model_response_event.audio is not None:
                    if full_model_response.audio is None:
                        full_model_response.audio = AudioResponse(
                            id=str(uuid4()), content="", transcript=""
                        )

                    if model_response_event.audio.id is not None:
                        full_model_response.audio.id = model_response_event.audio.id  # type: ignore
                    if model_response_event.audio.content is not None:
                        full_model_response.audio.content += (
                            model_response_event.audio.content
                        )  # type: ignore
                    if model_response_event.audio.transcript is not None:
                        full_model_response.audio.transcript += (
                            model_response_event.audio.transcript
                        )  # type: ignore
                    if model_response_event.audio.expires_at is not None:
                        full_model_response.audio.expires_at = (
                            model_response_event.audio.expires_at
                        )  # type: ignore
                    if model_response_event.audio.mime_type is not None:
                        full_model_response.audio.mime_type = (
                            model_response_event.audio.mime_type
                        )  # type: ignore
                    if model_response_event.audio.sample_rate is not None:
                        full_model_response.audio.sample_rate = (
                            model_response_event.audio.sample_rate
                        )
                    if model_response_event.audio.channels is not None:
                        full_model_response.audio.channels = (
                            model_response_event.audio.channels
                        )

                    # Yield the audio and transcript bit by bit
                    should_yield = True

                if model_response_event.image is not None:
                    self.add_image(model_response_event.image)

                    should_yield = True

                # Only yield the chunk
                if should_yield:
                    yield create_team_run_response_content_event(
                        from_run_response=run_response,
                        content=model_response_event.content,
                        thinking=model_response_event.thinking,
                        redacted_thinking=model_response_event.redacted_thinking,
                        response_audio=full_model_response.audio,
                        citations=model_response_event.citations,
                        image=model_response_event.image,
                    )

            # If the model response is a tool_call_started, add the tool call to the run_response
            elif (
                model_response_event.event == ModelResponseEvent.tool_call_started.value
            ):
                # Add tool calls to the run_response
                tool_executions_list = model_response_event.tool_executions
                if tool_executions_list is not None:
                    # Add tool calls to the agent.run_response
                    if run_response.tools is None:
                        run_response.tools = tool_executions_list
                    else:
                        run_response.tools.extend(tool_executions_list)

                    for tool in tool_executions_list:
                        yield create_team_tool_call_started_event(
                            from_run_response=run_response,
                            tool=tool,
                        )
                # Format tool calls whenever new ones are added during streaming
                run_response.formatted_tool_calls = format_tool_calls(
                    run_response.tools or []
                )

            # If the model response is a tool_call_completed, update the existing tool call in the run_response
            elif (
                model_response_event.event
                == ModelResponseEvent.tool_call_completed.value
            ):
                reasoning_step: Optional[ReasoningStep] = None
                tool_executions_list = model_response_event.tool_executions
                if tool_executions_list is not None:
                    # Update the existing tool call in the run_response
                    if run_response.tools:
                        # Create a mapping of tool_call_id to index
                        tool_call_index_map = {
                            tc.tool_call_id: i
                            for i, tc in enumerate(run_response.tools)
                            if tc.tool_call_id is not None
                        }
                        # Process tool calls
                        for tool_call_dict in tool_executions_list:
                            tool_call_id = tool_call_dict.tool_call_id or ""
                            index = tool_call_index_map.get(tool_call_id)
                            if index is not None:
                                run_response.tools[index] = tool_call_dict
                    else:
                        run_response.tools = tool_executions_list

                    # Only iterate through new tool calls
                    for tool_call in tool_executions_list:
                        tool_name = tool_call.tool_name or ""
                        if tool_name.lower() in ["think", "analyze"]:
                            tool_args = tool_call.tool_args or {}

                            reasoning_step = (
                                self.update_reasoning_content_from_tool_call(
                                    run_response, tool_name, tool_args
                                )
                            )

                            metrics = tool_call.metrics
                            if metrics is not None and metrics.time is not None:
                                reasoning_state["reasoning_time_taken"] = (
                                    reasoning_state["reasoning_time_taken"]
                                    + float(metrics.time)
                                )

                        yield create_team_tool_call_completed_event(
                            from_run_response=run_response,
                            tool=tool_call,
                            content=model_response_event.content,
                        )

                if stream_intermediate_steps:
                    if reasoning_step is not None:
                        if not reasoning_state["reasoning_started"]:
                            yield create_team_reasoning_started_event(
                                from_run_response=run_response,
                            )
                            reasoning_state["reasoning_started"] = True

                        yield create_team_reasoning_step_event(
                            from_run_response=run_response,
                            reasoning_step=reasoning_step,
                            reasoning_content=run_response.reasoning_content or "",
                        )

    def _convert_response_to_structured_format(self, run_response: TeamRunResponse):
        # Convert the response to the structured format if needed
        if self.response_model is not None and not isinstance(
            run_response.content, self.response_model
        ):
            if isinstance(run_response.content, str) and self.parse_response:
                try:
                    parsed_response_content = parse_response_model_str(
                        run_response.content, self.response_model
                    )

                    # Update TeamRunResponse
                    if parsed_response_content is not None:
                        run_response.content = parsed_response_content
                        run_response.content_type = self.response_model.__name__
                    else:
                        log_warning("Failed to convert response to response_model")
                except Exception as e:
                    log_warning(f"Failed to convert response to output model: {e}")
            else:
                log_warning(
                    "Something went wrong. Team run response content is not a string"
                )
        elif self._member_response_model is not None and not isinstance(
            run_response.content, self._member_response_model
        ):
            if isinstance(run_response.content, str):
                try:
                    parsed_response_content = parse_response_model_str(
                        run_response.content, self._member_response_model
                    )
                    # Update TeamRunResponse
                    if parsed_response_content is not None:
                        run_response.content = parsed_response_content
                        run_response.content_type = self._member_response_model.__name__
                    else:
                        log_warning("Failed to convert response to response_model")
                except Exception as e:
                    log_warning(f"Failed to convert response to output model: {e}")
            else:
                log_warning(
                    "Something went wrong. Member run response content is not a string"
                )

    def _initialize_session_state(
        self, user_id: Optional[str] = None, session_id: Optional[str] = None
    ) -> None:
        self.session_state = self.session_state or {}

        if user_id is not None:
            self.session_state["current_user_id"] = user_id
            if self.team_session_state is not None:
                self.team_session_state["current_user_id"] = user_id

        if session_id is not None:
            self.session_state["current_session_id"] = session_id
            if self.team_session_state is not None:
                self.team_session_state["current_session_id"] = session_id

    def _make_memories_and_summaries(
        self, run_messages: RunMessages, session_id: str, user_id: Optional[str] = None
    ) -> Iterator[TeamRunResponseEvent]:
        from concurrent.futures import ThreadPoolExecutor, as_completed

        self.run_response = cast(TeamRunResponse, self.run_response)
        self.memory = cast(Memory, self.memory)

        # Create a thread pool with a reasonable number of workers
        with ThreadPoolExecutor(max_workers=3) as executor:
            futures = []
            user_message_str = (
                run_messages.user_message.get_content_string()
                if run_messages.user_message is not None
                else None
            )
            if (
                self.enable_user_memories
                and user_message_str is not None
                and user_message_str
            ):
                futures.append(
                    executor.submit(
                        self.memory.create_user_memories,
                        message=user_message_str,
                        user_id=user_id,
                    )
                )

            # Update the session summary if needed
            if self.enable_session_summaries:
                futures.append(
                    executor.submit(
                        self.memory.create_session_summary,
                        session_id=session_id,
                        user_id=user_id,
                    )  # type: ignore
                )

            if futures:
                if self.stream_intermediate_steps:
                    yield create_team_memory_update_started_event(
                        from_run_response=self.run_response
                    )

                # Wait for all operations to complete and handle any errors
                for future in as_completed(futures):
                    try:
                        future.result()
                    except Exception as e:
                        log_warning(f"Error in memory/summary operation: {str(e)}")

                if self.stream_intermediate_steps:
                    yield create_team_memory_update_completed_event(
                        from_run_response=self.run_response
                    )

    async def _amake_memories_and_summaries(
        self, run_messages: RunMessages, session_id: str, user_id: Optional[str] = None
    ) -> AsyncIterator[TeamRunResponseEvent]:
        self.memory = cast(Memory, self.memory)
        self.run_response = cast(TeamRunResponse, self.run_response)
        tasks = []

        user_message_str = (
            run_messages.user_message.get_content_string()
            if run_messages.user_message is not None
            else None
        )
        if (
            self.enable_user_memories
            and user_message_str is not None
            and user_message_str
        ):
            tasks.append(
                self.memory.acreate_user_memories(
                    message=user_message_str, user_id=user_id
                )
            )

        # Update the session summary if needed
        if self.enable_session_summaries:
            tasks.append(
                self.memory.acreate_session_summary(
                    session_id=session_id, user_id=user_id
                )
            )  # type: ignore

        if tasks:
            if self.stream_intermediate_steps:
                yield create_team_memory_update_started_event(
                    from_run_response=self.run_response
                )

            # Execute all tasks concurrently and handle any errors
            try:
                await asyncio.gather(*tasks)
            except Exception as e:
                log_warning(f"Error in memory/summary operation: {str(e)}")

            if self.stream_intermediate_steps:
                yield create_team_memory_update_completed_event(
                    from_run_response=self.run_response
                )

    def _get_response_format(self) -> Optional[Union[Dict, Type[BaseModel]]]:
        self.model = cast(Model, self.model)
        if self.response_model is None:
            return None
        else:
            json_response_format = {"type": "json_object"}

            if self.model.supports_native_structured_outputs:
                if not self.use_json_mode:
                    log_debug("Setting Model.response_format to Agent.response_model")
                    return self.response_model
                else:
                    log_debug(
                        "Model supports native structured outputs but it is not enabled. Using JSON mode instead."
                    )
                    return json_response_format

            elif self.model.supports_json_schema_outputs:
                if self.use_json_mode:
                    log_debug("Setting Model.response_format to JSON response mode")
                    return {
                        "type": "json_schema",
                        "json_schema": {
                            "name": self.response_model.__name__,
                            "schema": self.response_model.model_json_schema(),
                        },
                    }
                else:
                    return None

            else:
                log_debug("Model does not support structured or JSON schema outputs.")
                return json_response_format

    ###########################################################################
    # Print Response
    ###########################################################################

    def print_response(
        self,
        message: Optional[Union[List, Dict, str, Message]] = None,
        *,
        stream: bool = False,
        stream_intermediate_steps: bool = False,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        show_message: bool = True,
        show_reasoning: bool = True,
        show_full_reasoning: bool = False,
        console: Optional[Any] = None,
        tags_to_include_in_markdown: Optional[Set[str]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        markdown: Optional[bool] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        if not tags_to_include_in_markdown:
            tags_to_include_in_markdown = {"think", "thinking"}

        if markdown is None:
            markdown = self.markdown
        else:
            self.markdown = markdown

        if self.response_model is not None:
            stream = False

        if stream:
            self._print_response_stream(
                message=message,
                console=console,
                show_message=show_message,
                show_reasoning=show_reasoning,
                show_full_reasoning=show_full_reasoning,
                tags_to_include_in_markdown=tags_to_include_in_markdown,
                session_id=session_id,
                user_id=user_id,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                markdown=markdown,
                stream_intermediate_steps=stream_intermediate_steps,
                knowledge_filters=knowledge_filters,
                **kwargs,
            )
        else:
            self._print_response(
                message=message,
                console=console,
                show_message=show_message,
                show_reasoning=show_reasoning,
                show_full_reasoning=show_full_reasoning,
                tags_to_include_in_markdown=tags_to_include_in_markdown,
                session_id=session_id,
                user_id=user_id,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                markdown=markdown,
                knowledge_filters=knowledge_filters,
                **kwargs,
            )

    def _print_response(
        self,
        message: Optional[Union[List, Dict, str, Message]] = None,
        console: Optional[Any] = None,
        show_message: bool = True,
        show_reasoning: bool = True,
        show_full_reasoning: bool = False,
        tags_to_include_in_markdown: Optional[Set[str]] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        markdown: bool = False,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        import textwrap

        from rich.console import Group
        from rich.json import JSON
        from rich.live import Live
        from rich.markdown import Markdown
        from rich.status import Status
        from rich.text import Text

        from agno.utils.response import format_tool_calls

        if not tags_to_include_in_markdown:
            tags_to_include_in_markdown = {"think", "thinking"}

        with Live(console=console) as live_console:
            status = Status(
                "Thinking...", spinner="aesthetic", speed=0.4, refresh_per_second=10
            )
            live_console.update(status)

            response_timer = Timer()
            response_timer.start()
            # Panels to be rendered
            panels = [status]
            # First render the message panel if the message is not None
            if message and show_message:
                # Convert message to a panel
                message_content = get_text_from_message(message)
                message_panel = create_panel(
                    content=Text(message_content, style="green"),
                    title="Message",
                    border_style="cyan",
                )
                panels.append(message_panel)
                live_console.update(Group(*panels))

            # Run the agent
            run_response: TeamRunResponse = self.run(  # type: ignore
                message=message,
                images=images,
                audio=audio,
                videos=videos,
                files=files,
                stream=False,
                session_id=session_id,
                user_id=user_id,
                knowledge_filters=knowledge_filters,
                **kwargs,
            )
            response_timer.stop()

            team_markdown = False
            member_markdown = {}
            if markdown:
                for member in self.members:
                    if isinstance(member, Agent) and member.agent_id is not None:
                        member_markdown[member.agent_id] = True
                    if isinstance(member, Team) and member.team_id is not None:
                        member_markdown[member.team_id] = True
                team_markdown = True

            if self.response_model is not None:
                team_markdown = False

            for member in self.members:
                if (
                    member.response_model is not None
                    and isinstance(member, Agent)
                    and member.agent_id is not None
                ):
                    member_markdown[member.agent_id] = False  # type: ignore
                if (
                    member.response_model is not None
                    and isinstance(member, Team)
                    and member.team_id is not None
                ):
                    member_markdown[member.team_id] = False  # type: ignore

            # Handle reasoning
            reasoning_steps = []
            if (
                isinstance(run_response, TeamRunResponse)
                and run_response.extra_data is not None
                and run_response.extra_data.reasoning_steps is not None
            ):
                reasoning_steps = run_response.extra_data.reasoning_steps

            if len(reasoning_steps) > 0 and show_reasoning:
                # Create panels for reasoning steps
                for i, step in enumerate(reasoning_steps, 1):
                    reasoning_panel = self._build_reasoning_step_panel(
                        i, step, show_full_reasoning
                    )
                    panels.append(reasoning_panel)
                live_console.update(Group(*panels))

            if (
                isinstance(run_response, TeamRunResponse)
                and run_response.thinking is not None
            ):
                # Create panel for thinking
                thinking_panel = create_panel(
                    content=Text(run_response.thinking),
                    title=f"Thinking ({response_timer.elapsed:.1f}s)",
                    border_style="green",
                )
                panels.append(thinking_panel)
                live_console.update(Group(*panels))

            if isinstance(run_response, TeamRunResponse):
                # Handle member responses
                if self.show_members_responses:
                    for member_response in run_response.member_responses:
                        # Handle member reasoning
                        reasoning_steps = []
                        if (
                            isinstance(member_response, RunResponse)
                            and member_response.extra_data is not None
                            and member_response.extra_data.reasoning_steps is not None
                        ):
                            reasoning_steps.extend(
                                member_response.extra_data.reasoning_steps
                            )

                        if len(reasoning_steps) > 0 and show_reasoning:
                            # Create panels for reasoning steps
                            for i, step in enumerate(reasoning_steps, 1):
                                member_reasoning_panel = (
                                    self._build_reasoning_step_panel(
                                        i, step, show_full_reasoning, color="magenta"
                                    )
                                )
                                panels.append(member_reasoning_panel)

                        # Add tool calls panel for member if available
                        if (
                            self.show_tool_calls
                            and hasattr(member_response, "tools")
                            and member_response.tools
                        ):
                            member_name = None
                            if (
                                isinstance(member_response, RunResponse)
                                and member_response.agent_id is not None
                            ):
                                member_name = self._get_member_name(
                                    member_response.agent_id
                                )
                            elif (
                                isinstance(member_response, TeamRunResponse)
                                and member_response.team_id is not None
                            ):
                                member_name = self._get_member_name(
                                    member_response.team_id
                                )

                            if member_name:
                                formatted_calls = format_tool_calls(
                                    member_response.tools
                                )
                                if formatted_calls:
                                    console_width = console.width if console else 80
                                    panel_width = console_width + 30

                                    lines = []
                                    for call in formatted_calls:
                                        wrapped_call = textwrap.fill(
                                            f"• {call}",
                                            width=panel_width,
                                            subsequent_indent="  ",
                                        )
                                        lines.append(wrapped_call)

                                    tool_calls_text = "\n\n".join(lines)

                                    member_tool_calls_panel = create_panel(
                                        content=tool_calls_text,
                                        title=f"{member_name} Tool Calls",
                                        border_style="yellow",
                                    )
                                    panels.append(member_tool_calls_panel)
                                    live_console.update(Group(*panels))

                        show_markdown = False
                        if member_markdown:
                            if (
                                isinstance(member_response, RunResponse)
                                and member_response.agent_id is not None
                            ):
                                show_markdown = member_markdown.get(
                                    member_response.agent_id, False
                                )
                            elif (
                                isinstance(member_response, TeamRunResponse)
                                and member_response.team_id is not None
                            ):
                                show_markdown = member_markdown.get(
                                    member_response.team_id, False
                                )

                        member_response_content: Union[str, JSON, Markdown] = (
                            self._parse_response_content(
                                member_response,
                                tags_to_include_in_markdown,
                                show_markdown=show_markdown,
                            )
                        )

                        # Create panel for member response
                        if (
                            isinstance(member_response, RunResponse)
                            and member_response.agent_id is not None
                        ):
                            member_response_panel = create_panel(
                                content=member_response_content,
                                title=f"{self._get_member_name(member_response.agent_id)} Response",
                                border_style="magenta",
                            )
                        elif (
                            isinstance(member_response, TeamRunResponse)
                            and member_response.team_id is not None
                        ):
                            member_response_panel = create_panel(
                                content=member_response_content,
                                title=f"{self._get_member_name(member_response.team_id)} Response",
                                border_style="magenta",
                            )
                        panels.append(member_response_panel)

                        if (
                            member_response.citations is not None
                            and member_response.citations.urls is not None
                        ):
                            md_content = "\n".join(
                                f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                                for i, citation in enumerate(
                                    member_response.citations.urls
                                )
                                if citation.url  # Only include citations with valid URLs
                            )
                            if md_content:  # Only create panel if there are citations
                                citations_panel = create_panel(
                                    content=Markdown(md_content),
                                    title="Citations",
                                    border_style="magenta",
                                )
                                panels.append(citations_panel)

                    live_console.update(Group(*panels))

                # Add team level tool calls panel if available
                if self.show_tool_calls and run_response.tools:
                    formatted_calls = format_tool_calls(run_response.tools)
                    if formatted_calls:
                        console_width = console.width if console else 80
                        # Allow for panel borders and padding
                        panel_width = console_width + 30

                        lines = []
                        for call in formatted_calls:
                            wrapped_call = textwrap.fill(
                                f"• {call}", width=panel_width, subsequent_indent="  "
                            )  # Indent continuation lines
                            lines.append(wrapped_call)

                        # Join with blank lines between items
                        tool_calls_text = "\n\n".join(lines)

                        team_tool_calls_panel = create_panel(
                            content=tool_calls_text,
                            title="Team Tool Calls",
                            border_style="yellow",
                        )
                        panels.append(team_tool_calls_panel)
                        live_console.update(Group(*panels))

                response_content_batch: Union[str, JSON, Markdown] = (
                    self._parse_response_content(
                        run_response,
                        tags_to_include_in_markdown,
                        show_markdown=team_markdown,
                    )
                )

                # Create panel for response
                response_panel = create_panel(
                    content=response_content_batch,
                    title=f"Response ({response_timer.elapsed:.1f}s)",
                    border_style="blue",
                )
                panels.append(response_panel)

                # Add citations
                if (
                    run_response.citations is not None
                    and run_response.citations.urls is not None
                ):
                    md_content = "\n".join(
                        f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                        for i, citation in enumerate(run_response.citations.urls)
                        if citation.url  # Only include citations with valid URLs
                    )
                    if md_content:  # Only create panel if there are citations
                        citations_panel = create_panel(
                            content=Markdown(md_content),
                            title="Citations",
                            border_style="green",
                        )
                        panels.append(citations_panel)

                if self.memory is not None and isinstance(self.memory, Memory):
                    if (
                        self.memory.memory_manager is not None
                        and self.memory.memory_manager.memories_updated
                    ):
                        memory_panel = create_panel(
                            content=Text("Memories updated"),
                            title="Memories",
                            border_style="green",
                        )
                        panels.append(memory_panel)

                    if (
                        self.memory.summary_manager is not None
                        and self.memory.summary_manager.summary_updated
                    ):
                        summary_panel = create_panel(
                            content=Text("Session summary updated"),
                            title="Session Summary",
                            border_style="green",
                        )
                        panels.append(summary_panel)

            # Final update to remove the "Thinking..." status
            panels = [p for p in panels if not isinstance(p, Status)]
            live_console.update(Group(*panels))

    def _print_response_stream(
        self,
        message: Optional[Union[List, Dict, str, Message]] = None,
        console: Optional[Any] = None,
        show_message: bool = True,
        show_reasoning: bool = True,
        show_full_reasoning: bool = False,
        tags_to_include_in_markdown: Optional[Set[str]] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        markdown: bool = False,
        stream_intermediate_steps: bool = False,  # type: ignore
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        import textwrap

        from rich.console import Group
        from rich.live import Live
        from rich.markdown import Markdown
        from rich.status import Status
        from rich.text import Text

        from agno.utils.response import format_tool_calls

        if not tags_to_include_in_markdown:
            tags_to_include_in_markdown = {"think", "thinking"}

        stream_intermediate_steps = (
            True  # With streaming print response, we need to stream intermediate steps
        )

        _response_content: str = ""
        _response_thinking: str = ""
        reasoning_steps: List[ReasoningStep] = []

        # Track tool calls by member and team
        member_tool_calls = {}  # type: ignore
        team_tool_calls = []  # type: ignore

        # Track processed tool calls to avoid duplicates
        processed_tool_calls = set()

        with Live(console=console) as live_console:
            status = Status(
                "Thinking...", spinner="aesthetic", speed=0.4, refresh_per_second=10
            )
            live_console.update(status)
            response_timer = Timer()
            response_timer.start()
            # Flag which indicates if the panels should be rendered
            render = False
            # Panels to be rendered
            panels = [status]
            # First render the message panel if the message is not None
            if message and show_message:
                render = True
                # Convert message to a panel
                message_content = get_text_from_message(message)
                message_panel = create_panel(
                    content=Text(message_content, style="green"),
                    title="Message",
                    border_style="cyan",
                )
                panels.append(message_panel)
            if render:
                live_console.update(Group(*panels))

            # Get response from the team
            stream_resp = self.run(  # type: ignore
                message=message,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                stream=True,
                stream_intermediate_steps=stream_intermediate_steps,
                session_id=session_id,
                user_id=user_id,
                knowledge_filters=knowledge_filters,
                **kwargs,
            )

            team_markdown = None
            member_markdown = {}

            # Dict to track member response panels by member_id
            member_response_panels = {}

            for resp in stream_resp:
                if team_markdown is None:
                    if markdown:
                        team_markdown = True
                    else:
                        team_markdown = False

                    if self.response_model is not None:
                        team_markdown = False

                if isinstance(resp, tuple(get_args(TeamRunResponseEvent))):
                    if resp.event == TeamRunEvent.run_response_content:
                        if isinstance(resp.content, str):
                            _response_content += resp.content
                        if resp.thinking is not None:
                            _response_thinking += resp.thinking
                    if (
                        hasattr(resp, "extra_data")
                        and resp.extra_data is not None
                        and resp.extra_data.reasoning_steps is not None
                    ):
                        reasoning_steps = resp.extra_data.reasoning_steps

                    # Collect team tool calls, avoiding duplicates
                    if (
                        self.show_tool_calls
                        and isinstance(resp, ToolCallCompletedEvent)
                        and resp.tool
                    ):
                        tool = resp.tool
                        # Generate a unique ID for this tool call
                        if tool.tool_call_id:
                            tool_id = tool.tool_call_id
                        else:
                            tool_id = str(hash(str(tool)))
                        if tool_id not in processed_tool_calls:
                            processed_tool_calls.add(tool_id)
                            team_tool_calls.append(tool)

                # Collect member tool calls, avoiding duplicates
                if (
                    self.show_tool_calls
                    and hasattr(resp, "member_responses")
                    and resp.member_responses
                ):
                    for member_response in resp.member_responses:
                        member_id = None
                        if (
                            isinstance(member_response, RunResponse)
                            and member_response.agent_id is not None
                        ):
                            member_id = member_response.agent_id
                        elif (
                            isinstance(member_response, TeamRunResponse)
                            and member_response.team_id is not None
                        ):
                            member_id = member_response.team_id

                        if (
                            member_id
                            and hasattr(member_response, "tools")
                            and member_response.tools
                        ):
                            if member_id not in member_tool_calls:
                                member_tool_calls[member_id] = []

                            for tool in member_response.tools:
                                # Generate a unique ID for this tool call
                                if tool.tool_call_id:
                                    tool_id = tool.tool_call_id
                                else:
                                    tool_id = str(hash(str(tool)))
                                if tool_id not in processed_tool_calls:
                                    processed_tool_calls.add(tool_id)
                                    member_tool_calls[member_id].append(tool)

                response_content_stream: Union[str, Markdown] = _response_content
                # Escape special tags before markdown conversion
                if team_markdown:
                    escaped_content = escape_markdown_tags(
                        _response_content, tags_to_include_in_markdown
                    )
                    response_content_stream = Markdown(escaped_content)

                # Create new panels for each chunk
                panels = []

                if message and show_message:
                    render = True
                    # Convert message to a panel
                    message_content = get_text_from_message(message)
                    message_panel = create_panel(
                        content=Text(message_content, style="green"),
                        title="Message",
                        border_style="cyan",
                    )
                    panels.append(message_panel)

                if len(reasoning_steps) > 0 and show_reasoning:
                    render = True
                    # Create panels for reasoning steps
                    for i, step in enumerate(reasoning_steps, 1):
                        reasoning_panel = self._build_reasoning_step_panel(
                            i, step, show_full_reasoning
                        )
                        panels.append(reasoning_panel)

                if len(_response_thinking) > 0:
                    render = True
                    # Create panel for thinking
                    thinking_panel = create_panel(
                        content=Text(_response_thinking),
                        title=f"Thinking ({response_timer.elapsed:.1f}s)",
                        border_style="green",
                    )
                    panels.append(thinking_panel)
                elif _response_content == "":
                    # Keep showing status if no content yet
                    panels.append(status)

                # Process member responses and their tool calls
                for member_response in (
                    resp.member_responses if hasattr(resp, "member_responses") else []
                ):
                    member_id = None
                    member_name = "Team Member"
                    if (
                        isinstance(member_response, RunResponse)
                        and member_response.agent_id is not None
                    ):
                        member_id = member_response.agent_id
                        member_name = self._get_member_name(member_id)
                    elif (
                        isinstance(member_response, TeamRunResponse)
                        and member_response.team_id is not None
                    ):
                        member_id = member_response.team_id
                        member_name = self._get_member_name(member_id)

                    # If we have tool calls for this member, display them
                    if (
                        self.show_tool_calls
                        and member_id in member_tool_calls
                        and member_tool_calls[member_id]
                    ):
                        formatted_calls = format_tool_calls(
                            member_tool_calls[member_id]
                        )
                        if formatted_calls:
                            console_width = console.width if console else 80
                            panel_width = console_width + 30

                            lines = []
                            for call in formatted_calls:
                                wrapped_call = textwrap.fill(
                                    f"• {call}",
                                    width=panel_width,
                                    subsequent_indent="  ",
                                )
                                lines.append(wrapped_call)

                            tool_calls_text = "\n\n".join(lines)

                            member_tool_calls_panel = create_panel(
                                content=tool_calls_text,
                                title=f"{member_name} Tool Calls",
                                border_style="yellow",
                            )
                            panels.append(member_tool_calls_panel)

                    # Process member response content
                    if self.show_members_responses and member_id is not None:
                        show_markdown = False
                        if markdown:
                            show_markdown = True

                        member_response_content = self._parse_response_content(
                            member_response,
                            tags_to_include_in_markdown,
                            show_markdown=show_markdown,
                        )

                        member_response_panel = create_panel(
                            content=member_response_content,
                            title=f"{member_name} Response",
                            border_style="magenta",
                        )

                        panels.append(member_response_panel)

                        # Store for reference
                        if member_id is not None:
                            member_response_panels[member_id] = member_response_panel

                # Add team tool calls panel if available (before the team response)
                if self.show_tool_calls and team_tool_calls:
                    formatted_calls = format_tool_calls(team_tool_calls)
                    if formatted_calls:
                        console_width = console.width if console else 80
                        panel_width = console_width + 30

                        lines = []
                        # Create a set to track already added calls by their string representation
                        added_calls = set()
                        for call in formatted_calls:
                            if call not in added_calls:
                                added_calls.add(call)
                                # Wrap the call text to fit within the panel
                                wrapped_call = textwrap.fill(
                                    f"• {call}",
                                    width=panel_width,
                                    subsequent_indent="  ",
                                )
                                lines.append(wrapped_call)

                        # Join with blank lines between items
                        tool_calls_text = "\n\n".join(lines)

                        team_tool_calls_panel = create_panel(
                            content=tool_calls_text,
                            title="Team Tool Calls",
                            border_style="yellow",
                        )
                        panels.append(team_tool_calls_panel)

                # Add the team response panel at the end
                if len(_response_content) > 0:
                    render = True
                    # Create panel for response
                    response_panel = create_panel(
                        content=response_content_stream,
                        title=f"Response ({response_timer.elapsed:.1f}s)",
                        border_style="blue",
                    )
                    panels.append(response_panel)

                if render or len(panels) > 0:
                    live_console.update(Group(*panels))

            response_timer.stop()

            # Add citations
            if (
                hasattr(resp, "citations")
                and resp.citations is not None
                and resp.citations.urls is not None
            ):
                md_content = "\n".join(
                    f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                    for i, citation in enumerate(resp.citations.urls)
                    if citation.url  # Only include citations with valid URLs
                )
                if md_content:  # Only create panel if there are citations
                    citations_panel = create_panel(
                        content=Markdown(md_content),
                        title="Citations",
                        border_style="green",
                    )
                    panels.append(citations_panel)
                    live_console.update(Group(*panels))

            if self.memory is not None and isinstance(self.memory, Memory):
                if (
                    self.memory.memory_manager is not None
                    and self.memory.memory_manager.memories_updated
                ):
                    memory_panel = create_panel(
                        content=Text("Memories updated"),
                        title="Memories",
                        border_style="green",
                    )
                    panels.append(memory_panel)
                    live_console.update(Group(*panels))

                if (
                    self.memory.summary_manager is not None
                    and self.memory.summary_manager.summary_updated
                ):
                    summary_panel = create_panel(
                        content=Text("Session summary updated"),
                        title="Session Summary",
                        border_style="green",
                    )
                    panels.append(summary_panel)
                    live_console.update(Group(*panels))

            # Final update to remove the "Thinking..." status
            panels = [p for p in panels if not isinstance(p, Status)]

            if markdown:
                for member in self.members:
                    if isinstance(member, Agent) and member.agent_id is not None:
                        member_markdown[member.agent_id] = True
                    if isinstance(member, Team) and member.team_id is not None:
                        member_markdown[member.team_id] = True

            for member in self.members:
                if (
                    member.response_model is not None
                    and isinstance(member, Agent)
                    and member.agent_id is not None
                ):
                    member_markdown[member.agent_id] = False  # type: ignore
                if (
                    member.response_model is not None
                    and isinstance(member, Team)
                    and member.team_id is not None
                ):
                    member_markdown[member.team_id] = False  # type: ignore

            # Final panels assembly - we'll recreate the panels from scratch to ensure correct order
            final_panels = []

            # Start with the message
            if message and show_message:
                message_content = get_text_from_message(message)
                message_panel = create_panel(
                    content=Text(message_content, style="green"),
                    title="Message",
                    border_style="cyan",
                )
                final_panels.append(message_panel)

            # Add reasoning steps
            if reasoning_steps and show_reasoning:
                for i, step in enumerate(reasoning_steps, 1):
                    reasoning_panel = self._build_reasoning_step_panel(
                        i, step, show_full_reasoning
                    )
                    final_panels.append(reasoning_panel)

            # Add thinking panel if available
            if _response_thinking:
                thinking_panel = create_panel(
                    content=Text(_response_thinking),
                    title=f"Thinking ({response_timer.elapsed:.1f}s)",
                    border_style="green",
                )
                final_panels.append(thinking_panel)

            # Add member tool calls and responses in correct order
            for i, member_response in enumerate(
                self.run_response.member_responses if self.run_response else []
            ):
                member_id = None
                if (
                    isinstance(member_response, RunResponse)
                    and member_response.agent_id is not None
                ):
                    member_id = member_response.agent_id
                elif (
                    isinstance(member_response, TeamRunResponse)
                    and member_response.team_id is not None
                ):
                    member_id = member_response.team_id

                if member_id:
                    # First add tool calls if any
                    if (
                        self.show_tool_calls
                        and member_id in member_tool_calls
                        and member_tool_calls[member_id]
                    ):
                        formatted_calls = format_tool_calls(
                            member_tool_calls[member_id]
                        )
                        if formatted_calls:
                            console_width = console.width if console else 80
                            panel_width = console_width + 30

                            lines = []
                            for call in formatted_calls:
                                wrapped_call = textwrap.fill(
                                    f"• {call}",
                                    width=panel_width,
                                    subsequent_indent="  ",
                                )
                                lines.append(wrapped_call)

                            tool_calls_text = "\n\n".join(lines)

                            member_name = self._get_member_name(member_id)
                            member_tool_calls_panel = create_panel(
                                content=tool_calls_text,
                                title=f"{member_name} Tool Calls",
                                border_style="yellow",
                            )
                            final_panels.append(member_tool_calls_panel)

                    # Add reasoning steps if any
                    reasoning_steps = []
                    if (
                        member_response.extra_data is not None
                        and member_response.extra_data.reasoning_steps is not None
                    ):
                        reasoning_steps = member_response.extra_data.reasoning_steps
                    if reasoning_steps and show_reasoning:
                        for j, step in enumerate(reasoning_steps, 1):
                            member_reasoning_panel = self._build_reasoning_step_panel(
                                j, step, show_full_reasoning, color="magenta"
                            )
                            final_panels.append(member_reasoning_panel)

                    # Then add response
                    show_markdown = False
                    if (
                        isinstance(member_response, RunResponse)
                        and member_response.agent_id is not None
                    ):
                        show_markdown = member_markdown.get(
                            member_response.agent_id, False
                        )
                    elif (
                        isinstance(member_response, TeamRunResponse)
                        and member_response.team_id is not None
                    ):
                        show_markdown = member_markdown.get(
                            member_response.team_id, False
                        )

                    member_response_content = self._parse_response_content(
                        member_response,
                        tags_to_include_in_markdown,
                        show_markdown=show_markdown,
                    )

                    member_name = "Team Member"
                    if (
                        isinstance(member_response, RunResponse)
                        and member_response.agent_id is not None
                    ):
                        member_name = self._get_member_name(member_response.agent_id)
                    elif (
                        isinstance(member_response, TeamRunResponse)
                        and member_response.team_id is not None
                    ):
                        member_name = self._get_member_name(member_response.team_id)

                    member_response_panel = create_panel(
                        content=member_response_content,
                        title=f"{member_name} Response",
                        border_style="magenta",
                    )
                    final_panels.append(member_response_panel)

                    # Add citations if any
                    if (
                        member_response.citations is not None
                        and member_response.citations.urls is not None
                    ):
                        md_content = "\n".join(
                            f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                            for i, citation in enumerate(member_response.citations.urls)
                            if citation.url  # Only include citations with valid URLs
                        )
                        if md_content:  # Only create panel if there are citations
                            citations_panel = create_panel(
                                content=Markdown(md_content),
                                title="Citations",
                                border_style="magenta",
                            )
                            final_panels.append(citations_panel)

            # Add team tool calls before team response
            if self.show_tool_calls and team_tool_calls:
                formatted_calls = format_tool_calls(team_tool_calls)
                if formatted_calls:
                    console_width = console.width if console else 80
                    panel_width = console_width + 30

                    lines = []
                    # Create a set to track already added calls by their string representation
                    added_calls = set()
                    for call in formatted_calls:
                        if call not in added_calls:
                            added_calls.add(call)
                            # Wrap the call text to fit within the panel
                            wrapped_call = textwrap.fill(
                                f"• {call}", width=panel_width, subsequent_indent="  "
                            )
                            lines.append(wrapped_call)

                    tool_calls_text = "\n\n".join(lines)

                    team_tool_calls_panel = create_panel(
                        content=tool_calls_text,
                        title="Team Tool Calls",
                        border_style="yellow",
                    )
                    final_panels.append(team_tool_calls_panel)

            # Add team response
            if _response_content:
                response_content_stream = _response_content
                if team_markdown:
                    escaped_content = escape_markdown_tags(
                        _response_content, tags_to_include_in_markdown
                    )
                    response_content_stream = Markdown(escaped_content)

                response_panel = create_panel(
                    content=response_content_stream,
                    title=f"Response ({response_timer.elapsed:.1f}s)",
                    border_style="blue",
                )
                final_panels.append(response_panel)

            # Add team citations
            if (
                hasattr(resp, "citations")
                and resp.citations is not None
                and resp.citations.urls is not None
            ):
                md_content = "\n".join(
                    f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                    for i, citation in enumerate(resp.citations.urls)
                    if citation.url  # Only include citations with valid URLs
                )
                if md_content:  # Only create panel if there are citations
                    citations_panel = create_panel(
                        content=Markdown(md_content),
                        title="Citations",
                        border_style="green",
                    )
                    final_panels.append(citations_panel)

            # Final update with correctly ordered panels
            live_console.update(Group(*final_panels))

    async def aprint_response(
        self,
        message: Optional[Union[List, Dict, str, Message]] = None,
        *,
        stream: bool = False,
        stream_intermediate_steps: bool = False,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        show_message: bool = True,
        show_reasoning: bool = True,
        show_full_reasoning: bool = False,
        console: Optional[Any] = None,
        tags_to_include_in_markdown: Optional[Set[str]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        markdown: Optional[bool] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        if not tags_to_include_in_markdown:
            tags_to_include_in_markdown = {"think", "thinking"}

        if markdown is None:
            markdown = self.markdown
        else:
            self.markdown = markdown

        if self.response_model is not None:
            stream = False

        if stream:
            await self._aprint_response_stream(
                message=message,
                console=console,
                show_message=show_message,
                show_reasoning=show_reasoning,
                show_full_reasoning=show_full_reasoning,
                tags_to_include_in_markdown=tags_to_include_in_markdown,
                session_id=session_id,
                user_id=user_id,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                markdown=markdown,
                stream_intermediate_steps=stream_intermediate_steps,
                knowledge_filters=knowledge_filters,
                **kwargs,
            )
        else:
            await self._aprint_response(
                message=message,
                console=console,
                show_message=show_message,
                show_reasoning=show_reasoning,
                show_full_reasoning=show_full_reasoning,
                tags_to_include_in_markdown=tags_to_include_in_markdown,
                session_id=session_id,
                user_id=user_id,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                markdown=markdown,
                knowledge_filters=knowledge_filters,
                **kwargs,
            )

    async def _aprint_response(
        self,
        message: Optional[Union[List, Dict, str, Message]] = None,
        console: Optional[Any] = None,
        show_message: bool = True,
        show_reasoning: bool = True,
        show_full_reasoning: bool = False,
        tags_to_include_in_markdown: Optional[Set[str]] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        markdown: bool = False,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> None:
        import textwrap

        from rich.console import Group
        from rich.json import JSON
        from rich.live import Live
        from rich.markdown import Markdown
        from rich.status import Status
        from rich.text import Text

        from agno.utils.response import format_tool_calls

        if not tags_to_include_in_markdown:
            tags_to_include_in_markdown = {"think", "thinking"}

        with Live(console=console) as live_console:
            status = Status(
                "Thinking...", spinner="aesthetic", speed=0.4, refresh_per_second=10
            )
            live_console.update(status)

            response_timer = Timer()
            response_timer.start()
            # Panels to be rendered
            panels = [status]
            # First render the message panel if the message is not None
            if message and show_message:
                # Convert message to a panel
                message_content = get_text_from_message(message)
                message_panel = create_panel(
                    content=Text(message_content, style="green"),
                    title="Message",
                    border_style="cyan",
                )
                panels.append(message_panel)
                live_console.update(Group(*panels))

            # Run the agent
            run_response: TeamRunResponse = await self.arun(  # type: ignore
                message=message,
                images=images,
                audio=audio,
                videos=videos,
                files=files,
                stream=False,
                session_id=session_id,
                user_id=user_id,
                knowledge_filters=knowledge_filters,
                **kwargs,
            )
            response_timer.stop()

            team_markdown = False
            member_markdown = {}
            if markdown:
                for member in self.members:
                    if isinstance(member, Agent) and member.agent_id is not None:
                        member_markdown[member.agent_id] = True
                    if isinstance(member, Team) and member.team_id is not None:
                        member_markdown[member.team_id] = True
                team_markdown = True

            if self.response_model is not None:
                team_markdown = False

            for member in self.members:
                if (
                    member.response_model is not None
                    and isinstance(member, Agent)
                    and member.agent_id is not None
                ):
                    member_markdown[member.agent_id] = False  # type: ignore
                if (
                    member.response_model is not None
                    and isinstance(member, Team)
                    and member.team_id is not None
                ):
                    member_markdown[member.team_id] = False  # type: ignore

            # Handle reasoning
            reasoning_steps = []
            if (
                isinstance(run_response, TeamRunResponse)
                and run_response.extra_data is not None
                and run_response.extra_data.reasoning_steps is not None
            ):
                reasoning_steps = run_response.extra_data.reasoning_steps

            if len(reasoning_steps) > 0 and show_reasoning:
                # Create panels for reasoning steps
                for i, step in enumerate(reasoning_steps, 1):
                    reasoning_panel = self._build_reasoning_step_panel(
                        i, step, show_full_reasoning
                    )
                    panels.append(reasoning_panel)
                live_console.update(Group(*panels))

            if (
                isinstance(run_response, TeamRunResponse)
                and run_response.thinking is not None
            ):
                # Create panel for thinking
                thinking_panel = create_panel(
                    content=Text(run_response.thinking),
                    title=f"Thinking ({response_timer.elapsed:.1f}s)",
                    border_style="green",
                )
                panels.append(thinking_panel)
                live_console.update(Group(*panels))

            if isinstance(run_response, TeamRunResponse):
                # Handle member responses
                if self.show_members_responses:
                    for member_response in run_response.member_responses:
                        # Handle member reasoning
                        reasoning_steps = []
                        if (
                            isinstance(member_response, RunResponse)
                            and member_response.extra_data is not None
                            and member_response.extra_data.reasoning_steps is not None
                        ):
                            reasoning_steps.extend(
                                member_response.extra_data.reasoning_steps
                            )

                        if len(reasoning_steps) > 0 and show_reasoning:
                            # Create panels for reasoning steps
                            for i, step in enumerate(reasoning_steps, 1):
                                member_reasoning_panel = (
                                    self._build_reasoning_step_panel(
                                        i, step, show_full_reasoning, color="magenta"
                                    )
                                )
                                panels.append(member_reasoning_panel)

                        # Add tool calls panel for member if available
                        if (
                            self.show_tool_calls
                            and hasattr(member_response, "tools")
                            and member_response.tools
                        ):
                            member_name = None
                            if (
                                isinstance(member_response, RunResponse)
                                and member_response.agent_id is not None
                            ):
                                member_name = self._get_member_name(
                                    member_response.agent_id
                                )
                            elif (
                                isinstance(member_response, TeamRunResponse)
                                and member_response.team_id is not None
                            ):
                                member_name = self._get_member_name(
                                    member_response.team_id
                                )

                            if member_name:
                                # Format tool calls
                                formatted_calls = format_tool_calls(
                                    member_response.tools
                                )
                                if formatted_calls:
                                    console_width = console.width if console else 80
                                    panel_width = console_width + 30

                                    lines = []
                                    for call in formatted_calls:
                                        wrapped_call = textwrap.fill(
                                            f"• {call}",
                                            width=panel_width,
                                            subsequent_indent="  ",
                                        )
                                        lines.append(wrapped_call)

                                    tool_calls_text = "\n\n".join(lines)

                                    member_tool_calls_panel = create_panel(
                                        content=tool_calls_text,
                                        title=f"{member_name} Tool Calls",
                                        border_style="yellow",
                                    )
                                    panels.append(member_tool_calls_panel)
                                    live_console.update(Group(*panels))

                        show_markdown = False
                        if (
                            isinstance(member_response, RunResponse)
                            and member_response.agent_id is not None
                        ):
                            show_markdown = member_markdown.get(
                                member_response.agent_id, False
                            )
                        elif (
                            isinstance(member_response, TeamRunResponse)
                            and member_response.team_id is not None
                        ):
                            show_markdown = member_markdown.get(
                                member_response.team_id, False
                            )

                        member_response_content: Union[str, JSON, Markdown] = (
                            self._parse_response_content(
                                member_response,
                                tags_to_include_in_markdown,
                                show_markdown=show_markdown,
                            )
                        )

                        # Create panel for member response
                        if (
                            isinstance(member_response, RunResponse)
                            and member_response.agent_id is not None
                        ):
                            member_response_panel = create_panel(
                                content=member_response_content,
                                title=f"{self._get_member_name(member_response.agent_id)} Response",
                                border_style="magenta",
                            )
                        elif (
                            isinstance(member_response, TeamRunResponse)
                            and member_response.team_id is not None
                        ):
                            member_response_panel = create_panel(
                                content=member_response_content,
                                title=f"{self._get_member_name(member_response.team_id)} Response",
                                border_style="magenta",
                            )
                        panels.append(member_response_panel)

                        if (
                            member_response.citations is not None
                            and member_response.citations.urls is not None
                        ):
                            md_content = "\n".join(
                                f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                                for i, citation in enumerate(
                                    member_response.citations.urls
                                )
                                if citation.url  # Only include citations with valid URLs
                            )
                            if md_content:
                                citations_panel = create_panel(
                                    content=Markdown(md_content),
                                    title="Citations",
                                    border_style="magenta",
                                )
                                panels.append(citations_panel)

                    live_console.update(Group(*panels))

                # Add team level tool calls panel if available
                if self.show_tool_calls and run_response.tools:
                    formatted_calls = format_tool_calls(run_response.tools)
                    if formatted_calls:
                        console_width = console.width if console else 80
                        # Allow for panel borders and padding
                        panel_width = console_width + 30

                        lines = []
                        for call in formatted_calls:
                            # Wrap the call text to fit within the panel
                            wrapped_call = textwrap.fill(
                                f"• {call}", width=panel_width, subsequent_indent="  "
                            )
                            lines.append(wrapped_call)

                        tool_calls_text = "\n\n".join(lines)

                        team_tool_calls_panel = create_panel(
                            content=tool_calls_text,
                            title="Team Tool Calls",
                            border_style="yellow",
                        )
                        panels.append(team_tool_calls_panel)
                        live_console.update(Group(*panels))

                response_content_batch: Union[str, JSON, Markdown] = (
                    self._parse_response_content(
                        run_response,
                        tags_to_include_in_markdown,
                        show_markdown=team_markdown,
                    )
                )

                # Create panel for response
                response_panel = create_panel(
                    content=response_content_batch,
                    title=f"Response ({response_timer.elapsed:.1f}s)",
                    border_style="blue",
                )
                panels.append(response_panel)

                # Add citations
                if (
                    run_response.citations is not None
                    and run_response.citations.urls is not None
                ):
                    md_content = "\n".join(
                        f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                        for i, citation in enumerate(run_response.citations.urls)
                        if citation.url  # Only include citations with valid URLs
                    )
                    if md_content:  # Only create panel if there are citations
                        citations_panel = create_panel(
                            content=Markdown(md_content),
                            title="Citations",
                            border_style="green",
                        )
                        panels.append(citations_panel)

                if self.memory is not None and isinstance(self.memory, Memory):
                    if (
                        self.memory.memory_manager is not None
                        and self.memory.memory_manager.memories_updated
                    ):
                        memory_panel = create_panel(
                            content=Text("Memories updated"),
                            title="Memories",
                            border_style="green",
                        )
                        panels.append(memory_panel)

                    if (
                        self.memory.summary_manager is not None
                        and self.memory.summary_manager.summary_updated
                    ):
                        summary_panel = create_panel(
                            content=Text("Session summary updated"),
                            title="Session Summary",
                            border_style="green",
                        )
                        panels.append(summary_panel)

            # Final update to remove the "Thinking..." status
            panels = [p for p in panels if not isinstance(p, Status)]
            live_console.update(Group(*panels))

    async def _aprint_response_stream(
        self,
        message: Optional[Union[List, Dict, str, Message]] = None,
        console: Optional[Any] = None,
        show_message: bool = True,
        show_reasoning: bool = True,
        show_full_reasoning: bool = False,
        tags_to_include_in_markdown: Optional[Set[str]] = None,
        session_id: Optional[str] = None,
        user_id: Optional[str] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        markdown: bool = False,
        stream_intermediate_steps: bool = False,  # type: ignore
        **kwargs: Any,
    ) -> None:
        import textwrap

        from rich.console import Group
        from rich.live import Live
        from rich.markdown import Markdown
        from rich.status import Status
        from rich.text import Text

        if not tags_to_include_in_markdown:
            tags_to_include_in_markdown = {"think", "thinking"}

        stream_intermediate_steps = (
            True  # With streaming print response, we need to stream intermediate steps
        )

        self.run_response = cast(TeamRunResponse, self.run_response)

        _response_content: str = ""
        _response_thinking: str = ""
        reasoning_steps: List[ReasoningStep] = []

        # Track tool calls by member and team
        member_tool_calls = {}  # type: ignore
        team_tool_calls: List[ToolExecution] = []

        # Track processed tool calls to avoid duplicates
        processed_tool_calls = set()

        # Initialize final_panels here
        final_panels = []  # type: ignore

        with Live(console=console) as live_console:
            status = Status(
                "Thinking...", spinner="aesthetic", speed=0.4, refresh_per_second=10
            )
            live_console.update(status)
            response_timer = Timer()
            response_timer.start()
            # Flag which indicates if the panels should be rendered
            render = False
            # Panels to be rendered
            panels = [status]
            # First render the message panel if the message is not None
            if message and show_message:
                render = True
                # Convert message to a panel
                message_content = get_text_from_message(message)
                message_panel = create_panel(
                    content=Text(message_content, style="green"),
                    title="Message",
                    border_style="cyan",
                )
                panels.append(message_panel)
            if render:
                live_console.update(Group(*panels))

            # Get response from the team
            stream_resp = await self.arun(  # type: ignore
                message=message,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                stream=True,
                stream_intermediate_steps=stream_intermediate_steps,
                session_id=session_id,
                user_id=user_id,
                **kwargs,
            )
            team_markdown = None
            member_markdown = {}

            async for resp in stream_resp:
                if team_markdown is None:
                    if markdown:
                        team_markdown = True
                    else:
                        team_markdown = False

                    if self.response_model is not None:
                        team_markdown = False

                if isinstance(resp, tuple(get_args(TeamRunResponseEvent))):
                    if resp.event == TeamRunEvent.run_response_content:
                        if isinstance(resp.content, str):
                            _response_content += resp.content
                        if resp.thinking is not None:
                            _response_thinking += resp.thinking
                    if (
                        hasattr(resp, "extra_data")
                        and resp.extra_data is not None
                        and resp.extra_data.reasoning_steps is not None
                    ):
                        reasoning_steps = resp.extra_data.reasoning_steps

                    # Collect team tool calls, avoiding duplicates
                    if (
                        self.show_tool_calls
                        and isinstance(resp, ToolCallCompletedEvent)
                        and resp.tool
                    ):
                        tool = resp.tool
                        # Generate a unique ID for this tool call
                        if tool.tool_call_id is not None:
                            tool_id = tool.tool_call_id
                        else:
                            tool_id = str(hash(str(tool)))
                        if tool_id not in processed_tool_calls:
                            processed_tool_calls.add(tool_id)
                            team_tool_calls.append(tool)

                # Collect member tool calls, avoiding duplicates
                if (
                    self.show_tool_calls
                    and hasattr(resp, "member_responses")
                    and resp.member_responses
                ):
                    for member_response in resp.member_responses:
                        member_id = None
                        if (
                            isinstance(member_response, RunResponse)
                            and member_response.agent_id is not None
                        ):
                            member_id = member_response.agent_id
                        elif (
                            isinstance(member_response, TeamRunResponse)
                            and member_response.team_id is not None
                        ):
                            member_id = member_response.team_id

                        if (
                            member_id
                            and hasattr(member_response, "tools")
                            and member_response.tools
                        ):
                            if member_id not in member_tool_calls:
                                member_tool_calls[member_id] = []

                            for tool in member_response.tools:
                                if tool.tool_call_id is not None:
                                    tool_id = tool.tool_call_id
                                else:
                                    tool_id = str(hash(str(tool)))
                                if tool_id not in processed_tool_calls:
                                    processed_tool_calls.add(tool_id)
                                    member_tool_calls[member_id].append(tool)

                response_content_stream: Union[str, Markdown] = _response_content
                # Escape special tags before markdown conversion
                if team_markdown:
                    escaped_content = escape_markdown_tags(
                        _response_content, tags_to_include_in_markdown
                    )
                    response_content_stream = Markdown(escaped_content)

                # Create new panels for each chunk
                panels = [status]

                if message and show_message:
                    render = True
                    # Convert message to a panel
                    message_content = get_text_from_message(message)
                    message_panel = create_panel(
                        content=Text(message_content, style="green"),
                        title="Message",
                        border_style="cyan",
                    )
                    panels.append(message_panel)
                if render:
                    live_console.update(Group(*panels))

                if len(reasoning_steps) > 0 and show_reasoning:
                    render = True
                    # Create panels for reasoning steps
                    for i, step in enumerate(reasoning_steps, 1):
                        reasoning_panel = self._build_reasoning_step_panel(
                            i, step, show_full_reasoning
                        )
                        panels.append(reasoning_panel)
                if render:
                    live_console.update(Group(*panels))

                if len(_response_thinking) > 0:
                    render = True
                    # Create panel for thinking
                    thinking_panel = create_panel(
                        content=Text(_response_thinking),
                        title=f"Thinking ({response_timer.elapsed:.1f}s)",
                        border_style="green",
                    )
                    panels.append(thinking_panel)
                if render:
                    live_console.update(Group(*panels))

                # Add tool calls panel if available
                if (
                    self.show_tool_calls
                    and resp is not None
                    and self.run_response.formatted_tool_calls
                ):
                    render = True
                    # Create bullet points for each tool call
                    tool_calls_content = Text()
                    # Use a set to track already processed tool calls
                    added_tool_calls = set()
                    for tool_call in self.run_response.formatted_tool_calls:
                        if tool_call not in added_tool_calls:
                            added_tool_calls.add(tool_call)
                            tool_calls_content.append(f"• {tool_call}\n")

                    tool_calls_panel = create_panel(
                        content=tool_calls_content.plain.rstrip(),
                        title="Tool Calls",
                        border_style="yellow",
                    )
                    panels.append(tool_calls_panel)

                if len(_response_content) > 0:
                    render = True
                    # Create panel for response
                    response_panel = create_panel(
                        content=response_content_stream,
                        title=f"Response ({response_timer.elapsed:.1f}s)",
                        border_style="blue",
                    )
                    panels.append(response_panel)
                if render:
                    live_console.update(Group(*panels))
            response_timer.stop()

            # Add citations
            if (
                hasattr(resp, "citations")
                and resp.citations is not None
                and resp.citations.urls is not None
            ):
                md_content = "\n".join(
                    f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                    for i, citation in enumerate(resp.citations.urls)
                    if citation.url  # Only include citations with valid URLs
                )
                if md_content:  # Only create panel if there are citations
                    citations_panel = create_panel(
                        content=Markdown(md_content),
                        title="Citations",
                        border_style="green",
                    )
                    panels.append(citations_panel)
                    live_console.update(Group(*panels))

            if self.memory is not None and isinstance(self.memory, Memory):
                if (
                    self.memory.memory_manager is not None
                    and self.memory.memory_manager.memories_updated
                ):
                    memory_panel = create_panel(
                        content=Text("Memories updated"),
                        title="Memories",
                        border_style="green",
                    )
                    panels.append(memory_panel)
                    live_console.update(Group(*panels))

                if (
                    self.memory.summary_manager is not None
                    and self.memory.summary_manager.summary_updated
                ):
                    summary_panel = create_panel(
                        content=Text("Session summary updated"),
                        title="Session Summary",
                        border_style="green",
                    )
                    panels.append(summary_panel)
                    live_console.update(Group(*panels))

            # Final update to remove the "Thinking..." status
            panels = [p for p in panels if not isinstance(p, Status)]

            if markdown:
                for member in self.members:
                    if isinstance(member, Agent) and member.agent_id is not None:
                        member_markdown[member.agent_id] = True  # type: ignore
                    if isinstance(member, Team) and member.team_id is not None:
                        member_markdown[member.team_id] = True  # type: ignore

            for member in self.members:
                if (
                    member.response_model is not None
                    and isinstance(member, Agent)
                    and member.agent_id is not None
                ):
                    member_markdown[member.agent_id] = False  # type: ignore
                if (
                    member.response_model is not None
                    and isinstance(member, Team)
                    and member.team_id is not None
                ):
                    member_markdown[member.team_id] = False  # type: ignore

            # Final panels assembly - we'll recreate the panels from scratch to ensure correct order
            final_panels = []

            # Start with the message
            if message and show_message:
                message_content = get_text_from_message(message)
                message_panel = create_panel(
                    content=Text(message_content, style="green"),
                    title="Message",
                    border_style="cyan",
                )
                final_panels.append(message_panel)

            # Add reasoning steps
            if reasoning_steps and show_reasoning:
                for i, step in enumerate(reasoning_steps, 1):
                    reasoning_panel = self._build_reasoning_step_panel(
                        i, step, show_full_reasoning
                    )
                    final_panels.append(reasoning_panel)

            # Add thinking panel if available
            if _response_thinking:
                thinking_panel = create_panel(
                    content=Text(_response_thinking),
                    title=f"Thinking ({response_timer.elapsed:.1f}s)",
                    border_style="green",
                )
                final_panels.append(thinking_panel)

            # Add member tool calls and responses in correct order
            for i, member_response in enumerate(
                self.run_response.member_responses if self.run_response else []
            ):
                member_id = None
                if (
                    isinstance(member_response, RunResponse)
                    and member_response.agent_id is not None
                ):
                    member_id = member_response.agent_id
                elif (
                    isinstance(member_response, TeamRunResponse)
                    and member_response.team_id is not None
                ):
                    member_id = member_response.team_id

                if member_id:
                    # First add tool calls if any
                    if (
                        self.show_tool_calls
                        and member_id in member_tool_calls
                        and member_tool_calls[member_id]
                    ):
                        formatted_calls = format_tool_calls(
                            member_tool_calls[member_id]
                        )
                        if formatted_calls:
                            console_width = console.width if console else 80
                            panel_width = console_width + 30

                            lines = []
                            # Create a set to track already added calls by their string representation
                            added_calls = set()
                            for call in formatted_calls:
                                if call not in added_calls:
                                    added_calls.add(call)
                                    # Wrap the call text to fit within the panel
                                    wrapped_call = textwrap.fill(
                                        f"• {call}",
                                        width=panel_width,
                                        subsequent_indent="  ",
                                    )
                                    lines.append(wrapped_call)

                            tool_calls_text = "\n\n".join(lines)

                            member_name = self._get_member_name(member_id)
                            member_tool_calls_panel = create_panel(
                                content=tool_calls_text,
                                title=f"{member_name} Tool Calls",
                                border_style="yellow",
                            )
                            final_panels.append(member_tool_calls_panel)

                    # Add reasoning steps if any
                    reasoning_steps = []
                    if (
                        member_response.extra_data is not None
                        and member_response.extra_data.reasoning_steps is not None
                    ):
                        reasoning_steps = member_response.extra_data.reasoning_steps
                    if reasoning_steps and show_reasoning:
                        for j, step in enumerate(reasoning_steps, 1):
                            member_reasoning_panel = self._build_reasoning_step_panel(
                                j, step, show_full_reasoning, color="magenta"
                            )
                            final_panels.append(member_reasoning_panel)

                    # Then add response
                    show_markdown = False
                    if (
                        isinstance(member_response, RunResponse)
                        and member_response.agent_id is not None
                    ):
                        show_markdown = member_markdown.get(
                            member_response.agent_id, False
                        )
                    elif (
                        isinstance(member_response, TeamRunResponse)
                        and member_response.team_id is not None
                    ):
                        show_markdown = member_markdown.get(
                            member_response.team_id, False
                        )

                    member_response_content = self._parse_response_content(
                        member_response,
                        tags_to_include_in_markdown,
                        show_markdown=show_markdown,
                    )

                    member_name = "Team Member"
                    if (
                        isinstance(member_response, RunResponse)
                        and member_response.agent_id is not None
                    ):
                        member_name = self._get_member_name(member_response.agent_id)
                    elif (
                        isinstance(member_response, TeamRunResponse)
                        and member_response.team_id is not None
                    ):
                        member_name = self._get_member_name(member_response.team_id)

                    member_response_panel = create_panel(
                        content=member_response_content,
                        title=f"{member_name} Response",
                        border_style="magenta",
                    )
                    final_panels.append(member_response_panel)

                    # Add citations if any
                    if (
                        member_response.citations is not None
                        and member_response.citations.urls is not None
                    ):
                        md_content = "\n".join(
                            f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                            for i, citation in enumerate(member_response.citations.urls)
                            if citation.url  # Only include citations with valid URLs
                        )
                        if md_content:  # Only create panel if there are citations
                            citations_panel = create_panel(
                                content=Markdown(md_content),
                                title="Citations",
                                border_style="magenta",
                            )
                            final_panels.append(citations_panel)

            # Add team tool calls before team response
            if self.show_tool_calls and team_tool_calls:
                formatted_calls = format_tool_calls(team_tool_calls)
                if formatted_calls:
                    console_width = console.width if console else 80
                    panel_width = console_width + 30

                    lines = []
                    # Create a set to track already added calls by their string representation
                    added_calls = set()
                    for call in formatted_calls:
                        if call not in added_calls:
                            added_calls.add(call)
                            # Wrap the call text to fit within the panel
                            wrapped_call = textwrap.fill(
                                f"• {call}", width=panel_width, subsequent_indent="  "
                            )
                            lines.append(wrapped_call)

                    tool_calls_text = "\n\n".join(lines)

                    team_tool_calls_panel = create_panel(
                        content=tool_calls_text,
                        title="Team Tool Calls",
                        border_style="yellow",
                    )
                    final_panels.append(team_tool_calls_panel)

            # Add team response
            if _response_content:
                response_content_stream = _response_content
                if team_markdown:
                    escaped_content = escape_markdown_tags(
                        _response_content, tags_to_include_in_markdown
                    )
                    response_content_stream = Markdown(escaped_content)

                response_panel = create_panel(
                    content=response_content_stream,
                    title=f"Response ({response_timer.elapsed:.1f}s)",
                    border_style="blue",
                )
                final_panels.append(response_panel)

            # Add team citations
            if (
                hasattr(resp, "citations")
                and resp.citations is not None
                and resp.citations.urls is not None
            ):
                md_content = "\n".join(
                    f"{i + 1}. [{citation.title or citation.url}]({citation.url})"
                    for i, citation in enumerate(resp.citations.urls)
                    if citation.url  # Only include citations with valid URLs
                )
                if md_content:  # Only create panel if there are citations
                    citations_panel = create_panel(
                        content=Markdown(md_content),
                        title="Citations",
                        border_style="green",
                    )
                    final_panels.append(citations_panel)

            # Final update with correctly ordered panels
            live_console.update(Group(*final_panels))

    def _build_reasoning_step_panel(
        self,
        step_idx: int,
        step: ReasoningStep,
        show_full_reasoning: bool = False,
        color: str = "green",
    ):
        from rich.text import Text

        # Build step content
        step_content = Text.assemble()
        if step.title is not None:
            step_content.append(f"{step.title}\n", "bold")
        if step.action is not None:
            step_content.append(
                Text.from_markup(f"[bold]Action:[/bold] {step.action}\n", style="dim")
            )
        if step.result is not None:
            step_content.append(Text.from_markup(step.result, style="dim"))

        if show_full_reasoning:
            # Add detailed reasoning information if available
            if step.reasoning is not None:
                step_content.append(
                    Text.from_markup(
                        f"\n[bold]Reasoning:[/bold] {step.reasoning}", style="dim"
                    )
                )
            if step.confidence is not None:
                step_content.append(
                    Text.from_markup(
                        f"\n[bold]Confidence:[/bold] {step.confidence}", style="dim"
                    )
                )
        return create_panel(
            content=step_content, title=f"Reasoning step {step_idx}", border_style=color
        )

    def _get_member_name(self, entity_id: str) -> str:
        for member in self.members:
            if isinstance(member, Agent):
                if member.agent_id == entity_id:
                    return member.name or entity_id
            elif isinstance(member, Team):
                if member.team_id == entity_id:
                    return member.name or entity_id
        return entity_id

    def _parse_response_content(
        self,
        run_response: Union[TeamRunResponse, RunResponse],
        tags_to_include_in_markdown: Set[str],
        show_markdown: bool = True,
    ) -> Any:
        from rich.json import JSON
        from rich.markdown import Markdown

        if isinstance(run_response.content, str):
            if show_markdown:
                escaped_content = escape_markdown_tags(
                    run_response.content, tags_to_include_in_markdown
                )
                return Markdown(escaped_content)
            else:
                return run_response.get_content_as_string(indent=4)
        elif isinstance(run_response.content, BaseModel):
            try:
                return JSON(
                    run_response.content.model_dump_json(exclude_none=True), indent=2
                )
            except Exception as e:
                log_warning(f"Failed to convert response to JSON: {e}")
        else:
            try:
                return JSON(json.dumps(run_response.content), indent=4)
            except Exception as e:
                log_warning(f"Failed to convert response to JSON: {e}")

    def cli_app(
        self,
        message: Optional[str] = None,
        user: str = "User",
        emoji: str = ":sunglasses:",
        stream: bool = False,
        markdown: bool = False,
        exit_on: Optional[List[str]] = None,
        **kwargs: Any,
    ) -> None:
        from rich.prompt import Prompt

        if message:
            self.print_response(
                message=message, stream=stream, markdown=markdown, **kwargs
            )

        _exit_on = exit_on or ["exit", "quit", "bye"]
        while True:
            message = Prompt.ask(f"[bold] {emoji} {user} [/bold]")
            if message in _exit_on:
                break

            self.print_response(
                message=message, stream=stream, markdown=markdown, **kwargs
            )

    ###########################################################################
    # Helpers
    ###########################################################################

    def _handle_reasoning(
        self, run_response: TeamRunResponse, run_messages: RunMessages
    ) -> None:
        if self.reasoning or self.reasoning_model is not None:
            reasoning_generator = self._reason(
                run_response=run_response, run_messages=run_messages
            )

            # Consume the generator without yielding
            deque(reasoning_generator, maxlen=0)

    def _handle_reasoning_stream(
        self, run_response: TeamRunResponse, run_messages: RunMessages
    ) -> Iterator[TeamRunResponseEvent]:
        if self.reasoning or self.reasoning_model is not None:
            reasoning_generator = self._reason(
                run_response=run_response, run_messages=run_messages
            )
            yield from reasoning_generator

    async def _ahandle_reasoning(
        self, run_response: TeamRunResponse, run_messages: RunMessages
    ) -> None:
        if self.reasoning or self.reasoning_model is not None:
            reason_generator = self._areason(
                run_response=run_response, run_messages=run_messages
            )
            # Consume the generator without yielding
            async for _ in reason_generator:
                pass

    async def _ahandle_reasoning_stream(
        self, run_response: TeamRunResponse, run_messages: RunMessages
    ) -> AsyncIterator[TeamRunResponseEvent]:
        if self.reasoning or self.reasoning_model is not None:
            reason_generator = self._areason(
                run_response=run_response, run_messages=run_messages
            )
            async for item in reason_generator:
                yield item

    def _calculate_session_metrics(self, messages: List[Message]) -> SessionMetrics:
        session_metrics = SessionMetrics()
        assistant_message_role = (
            self.model.assistant_message_role if self.model is not None else "assistant"
        )

        # Get metrics of the team-agent's messages
        for m in messages:
            if m.role == assistant_message_role and m.metrics is not None:
                session_metrics += m.metrics

        return session_metrics

    def _calculate_full_team_session_metrics(
        self, messages: List[Message]
    ) -> SessionMetrics:
        current_session_metrics = (
            self.session_metrics or self._calculate_session_metrics(messages)
        )
        current_session_metrics = replace(current_session_metrics)
        assistant_message_role = (
            self.model.assistant_message_role if self.model is not None else "assistant"
        )

        # Get metrics of the team-agent's messages
        for member in self.members:
            # Only members that ran has memory
            if member.memory is not None:
                if isinstance(member.memory, AgentMemory):
                    for m in member.memory.messages:
                        if m.role == assistant_message_role and m.metrics is not None:
                            current_session_metrics += m.metrics
        return current_session_metrics

    def _aggregate_metrics_from_messages(
        self, messages: List[Message]
    ) -> Dict[str, Any]:
        aggregated_metrics: Dict[str, Any] = defaultdict(list)
        assistant_message_role = (
            self.model.assistant_message_role if self.model is not None else "assistant"
        )
        for m in messages:
            if m.role == assistant_message_role and m.metrics is not None:
                for k, v in asdict(m.metrics).items():  # type: ignore
                    if k == "timer":
                        continue
                    if v is not None:
                        aggregated_metrics[k].append(v)
        if aggregated_metrics is not None:
            aggregated_metrics = dict(aggregated_metrics)
        return aggregated_metrics

    def _get_reasoning_agent(self, reasoning_model: Model) -> Optional[Agent]:
        return Agent(
            model=reasoning_model,
            monitoring=self.monitoring,
            telemetry=self.telemetry,
            debug_mode=self.debug_mode,
        )

    def _format_reasoning_step_content(
        self, run_response: TeamRunResponse, reasoning_step: ReasoningStep
    ) -> str:
        """Format content for a reasoning step without changing any existing logic."""
        step_content = ""
        if reasoning_step.title:
            step_content += f"## {reasoning_step.title}\n"
        if reasoning_step.reasoning:
            step_content += f"{reasoning_step.reasoning}\n"
        if reasoning_step.action:
            step_content += f"Action: {reasoning_step.action}\n"
        if reasoning_step.result:
            step_content += f"Result: {reasoning_step.result}\n"
        step_content += "\n"

        # Get the current reasoning_content and append this step
        current_reasoning_content = ""
        if (
            hasattr(run_response, "reasoning_content")
            and run_response.reasoning_content
        ):
            current_reasoning_content = run_response.reasoning_content

        # Create updated reasoning_content
        updated_reasoning_content = current_reasoning_content + step_content

        return updated_reasoning_content

    def _reason(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
    ) -> Iterator[TeamRunResponseEvent]:
        if self.stream_intermediate_steps:
            yield create_team_reasoning_started_event(from_run_response=run_response)

        use_default_reasoning = False

        # Get the reasoning model
        reasoning_model: Optional[Model] = self.reasoning_model
        reasoning_model_provided = reasoning_model is not None
        if reasoning_model is None and self.model is not None:
            from copy import deepcopy

            reasoning_model = deepcopy(self.model)
        if reasoning_model is None:
            log_warning(
                "Reasoning error. Reasoning model is None, continuing regular session..."
            )
            return

        # If a reasoning model is provided, use it to generate reasoning
        if reasoning_model_provided:
            from agno.reasoning.azure_ai_foundry import is_ai_foundry_reasoning_model
            from agno.reasoning.deepseek import is_deepseek_reasoning_model
            from agno.reasoning.groq import is_groq_reasoning_model
            from agno.reasoning.helpers import get_reasoning_agent
            from agno.reasoning.ollama import is_ollama_reasoning_model
            from agno.reasoning.openai import is_openai_reasoning_model

            reasoning_agent = self.reasoning_agent or get_reasoning_agent(
                reasoning_model=reasoning_model, monitoring=self.monitoring
            )
            is_deepseek = is_deepseek_reasoning_model(reasoning_model)
            is_groq = is_groq_reasoning_model(reasoning_model)
            is_openai = is_openai_reasoning_model(reasoning_model)
            is_ollama = is_ollama_reasoning_model(reasoning_model)
            is_ai_foundry = is_ai_foundry_reasoning_model(reasoning_model)

            if is_deepseek or is_groq or is_openai or is_ollama or is_ai_foundry:
                reasoning_message: Optional[Message] = None
                if is_deepseek:
                    from agno.reasoning.deepseek import get_deepseek_reasoning

                    log_debug("Starting DeepSeek Reasoning", center=True, symbol="=")
                    reasoning_message = get_deepseek_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )
                elif is_groq:
                    from agno.reasoning.groq import get_groq_reasoning

                    log_debug("Starting Groq Reasoning", center=True, symbol="=")
                    reasoning_message = get_groq_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )
                elif is_openai:
                    from agno.reasoning.openai import get_openai_reasoning

                    log_debug("Starting OpenAI Reasoning", center=True, symbol="=")
                    reasoning_message = get_openai_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )
                elif is_ollama:
                    from agno.reasoning.ollama import get_ollama_reasoning

                    log_debug("Starting Ollama Reasoning", center=True, symbol="=")
                    reasoning_message = get_ollama_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )
                elif is_ai_foundry:
                    from agno.reasoning.azure_ai_foundry import get_ai_foundry_reasoning

                    log_debug(
                        "Starting Azure AI Foundry Reasoning", center=True, symbol="="
                    )
                    reasoning_message = get_ai_foundry_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )

                if reasoning_message is None:
                    log_warning(
                        "Reasoning error. Reasoning response is None, continuing regular session..."
                    )
                    return

                run_messages.messages.append(reasoning_message)
                # Add reasoning step to the Agent's run_response
                update_run_response_with_reasoning(
                    run_response=run_response,
                    reasoning_steps=[ReasoningStep(result=reasoning_message.content)],
                    reasoning_agent_messages=[reasoning_message],
                )
                if self.stream_intermediate_steps:
                    yield create_team_reasoning_completed_event(
                        from_run_response=run_response,
                        content=ReasoningSteps(
                            reasoning_steps=[
                                ReasoningStep(result=reasoning_message.content)
                            ]
                        ),
                        content_type=ReasoningSteps.__name__,
                    )
            else:
                log_warning(
                    f"Reasoning model: {reasoning_model.__class__.__name__} is not a native reasoning model, defaulting to manual Chain-of-Thought reasoning"
                )
                use_default_reasoning = True
        # If no reasoning model is provided, use default reasoning
        else:
            use_default_reasoning = True

        if use_default_reasoning:
            from agno.reasoning.default import get_default_reasoning_agent
            from agno.reasoning.helpers import (
                get_next_action,
                update_messages_with_reasoning,
            )

            # Get default reasoning agent
            use_json_mode: bool = self.use_json_mode

            reasoning_agent: Optional[Agent] = self.reasoning_agent  # type: ignore
            if reasoning_agent is None:
                reasoning_agent = get_default_reasoning_agent(
                    reasoning_model=reasoning_model,
                    min_steps=self.reasoning_min_steps,
                    max_steps=self.reasoning_max_steps,
                    monitoring=self.monitoring,
                    telemetry=self.telemetry,
                    debug_mode=self.debug_mode,
                    use_json_mode=use_json_mode,
                )

            # Validate reasoning agent
            if reasoning_agent is None:
                log_warning(
                    "Reasoning error. Reasoning agent is None, continuing regular session..."
                )
                return
            # Ensure the reasoning agent response model is ReasoningSteps
            if (
                reasoning_agent.response_model is not None
                and not isinstance(reasoning_agent.response_model, type)
                and not issubclass(reasoning_agent.response_model, ReasoningSteps)
            ):
                log_warning(
                    "Reasoning agent response model should be `ReasoningSteps`, continuing regular session..."
                )
                return
            # Ensure the reasoning model and agent do not show tool calls
            reasoning_agent.show_tool_calls = False

            step_count = 1
            next_action = NextAction.CONTINUE
            reasoning_messages: List[Message] = []
            all_reasoning_steps: List[ReasoningStep] = []
            log_debug("Starting Reasoning", center=True, symbol="=")
            while (
                next_action == NextAction.CONTINUE
                and step_count < self.reasoning_max_steps
            ):
                log_debug(f"Step {step_count}", center=True, symbol="-")
                step_count += 1
                try:
                    # Run the reasoning agent
                    reasoning_agent_response: RunResponse = reasoning_agent.run(  # type: ignore
                        messages=run_messages.get_input_messages()
                    )
                    if (
                        reasoning_agent_response.content is None
                        or reasoning_agent_response.messages is None
                    ):
                        log_warning(
                            "Reasoning error. Reasoning response is empty, continuing regular session..."
                        )
                        break

                    if reasoning_agent_response.content.reasoning_steps is None:
                        log_warning(
                            "Reasoning error. Reasoning steps are empty, continuing regular session..."
                        )
                        break

                    reasoning_steps: List[ReasoningStep] = (
                        reasoning_agent_response.content.reasoning_steps
                    )
                    all_reasoning_steps.extend(reasoning_steps)
                    # Yield reasoning steps
                    if self.stream_intermediate_steps:
                        for reasoning_step in reasoning_steps:
                            updated_reasoning_content = (
                                self._format_reasoning_step_content(
                                    run_response, reasoning_step
                                )
                            )

                            yield create_team_reasoning_step_event(
                                from_run_response=run_response,
                                reasoning_step=reasoning_step,
                                reasoning_content=updated_reasoning_content,
                            )

                    # Find the index of the first assistant message
                    first_assistant_index = next(
                        (
                            i
                            for i, m in enumerate(reasoning_agent_response.messages)
                            if m.role == "assistant"
                        ),
                        len(reasoning_agent_response.messages),
                    )
                    # Extract reasoning messages starting from the message after the first assistant message
                    reasoning_messages = reasoning_agent_response.messages[
                        first_assistant_index:
                    ]

                    # Add reasoning step to the Agent's run_response
                    update_run_response_with_reasoning(
                        run_response=run_response,
                        reasoning_steps=reasoning_steps,
                        reasoning_agent_messages=reasoning_agent_response.messages,
                    )

                    # Get the next action
                    next_action = get_next_action(reasoning_steps[-1])
                    if next_action == NextAction.FINAL_ANSWER:
                        break
                except Exception as e:
                    log_error(f"Reasoning error: {e}")
                    break

            log_debug(f"Total Reasoning steps: {len(all_reasoning_steps)}")
            log_debug("Reasoning finished", center=True, symbol="=")

            # Update the messages_for_model to include reasoning messages
            update_messages_with_reasoning(
                run_messages=run_messages,
                reasoning_messages=reasoning_messages,
            )

            # Yield the final reasoning completed event
            if self.stream_intermediate_steps:
                yield create_team_reasoning_completed_event(
                    from_run_response=run_response,
                    content=ReasoningSteps(reasoning_steps=all_reasoning_steps),
                    content_type=ReasoningSteps.__name__,
                )

    async def _areason(
        self,
        run_response: TeamRunResponse,
        run_messages: RunMessages,
    ) -> AsyncIterator[TeamRunResponseEvent]:
        if self.stream_intermediate_steps:
            yield create_team_reasoning_started_event(from_run_response=run_response)

        use_default_reasoning = False

        # Get the reasoning model
        reasoning_model: Optional[Model] = self.reasoning_model
        reasoning_model_provided = reasoning_model is not None
        if reasoning_model is None and self.model is not None:
            from copy import deepcopy

            reasoning_model = deepcopy(self.model)
        if reasoning_model is None:
            log_warning(
                "Reasoning error. Reasoning model is None, continuing regular session..."
            )
            return

        # If a reasoning model is provided, use it to generate reasoning
        if reasoning_model_provided:
            from agno.reasoning.azure_ai_foundry import is_ai_foundry_reasoning_model
            from agno.reasoning.deepseek import is_deepseek_reasoning_model
            from agno.reasoning.groq import is_groq_reasoning_model
            from agno.reasoning.helpers import get_reasoning_agent
            from agno.reasoning.ollama import is_ollama_reasoning_model
            from agno.reasoning.openai import is_openai_reasoning_model

            reasoning_agent = self.reasoning_agent or get_reasoning_agent(
                reasoning_model=reasoning_model, monitoring=self.monitoring
            )
            is_deepseek = is_deepseek_reasoning_model(reasoning_model)
            is_groq = is_groq_reasoning_model(reasoning_model)
            is_openai = is_openai_reasoning_model(reasoning_model)
            is_ollama = is_ollama_reasoning_model(reasoning_model)
            is_ai_foundry = is_ai_foundry_reasoning_model(reasoning_model)

            if is_deepseek or is_groq or is_openai or is_ollama or is_ai_foundry:
                reasoning_message: Optional[Message] = None
                if is_deepseek:
                    from agno.reasoning.deepseek import aget_deepseek_reasoning

                    log_debug("Starting DeepSeek Reasoning", center=True, symbol="=")
                    reasoning_message = await aget_deepseek_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )
                elif is_groq:
                    from agno.reasoning.groq import aget_groq_reasoning

                    log_debug("Starting Groq Reasoning", center=True, symbol="=")
                    reasoning_message = await aget_groq_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )
                elif is_openai:
                    from agno.reasoning.openai import aget_openai_reasoning

                    log_debug("Starting OpenAI Reasoning", center=True, symbol="=")
                    reasoning_message = await aget_openai_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )
                elif is_ollama:
                    from agno.reasoning.ollama import get_ollama_reasoning

                    log_debug("Starting Ollama Reasoning", center=True, symbol="=")
                    reasoning_message = get_ollama_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )
                elif is_ai_foundry:
                    from agno.reasoning.azure_ai_foundry import get_ai_foundry_reasoning

                    log_debug(
                        "Starting Azure AI Foundry Reasoning", center=True, symbol="="
                    )
                    reasoning_message = get_ai_foundry_reasoning(
                        reasoning_agent=reasoning_agent,
                        messages=run_messages.get_input_messages(),
                    )

                if reasoning_message is None:
                    log_warning(
                        "Reasoning error. Reasoning response is None, continuing regular session..."
                    )
                    return
                run_messages.messages.append(reasoning_message)
                # Add reasoning step to the Agent's run_response
                update_run_response_with_reasoning(
                    run_response=run_response,
                    reasoning_steps=[ReasoningStep(result=reasoning_message.content)],
                    reasoning_agent_messages=[reasoning_message],
                )
                if self.stream_intermediate_steps:
                    yield create_team_reasoning_completed_event(
                        from_run_response=run_response,
                        content=ReasoningSteps(
                            reasoning_steps=[
                                ReasoningStep(result=reasoning_message.content)
                            ]
                        ),
                        content_type=ReasoningSteps.__name__,
                    )
            else:
                log_warning(
                    f"Reasoning model: {reasoning_model.__class__.__name__} is not a native reasoning model, defaulting to manual Chain-of-Thought reasoning"
                )
                use_default_reasoning = True
        # If no reasoning model is provided, use default reasoning
        else:
            use_default_reasoning = True

        if use_default_reasoning:
            from agno.reasoning.default import get_default_reasoning_agent
            from agno.reasoning.helpers import (
                get_next_action,
                update_messages_with_reasoning,
            )

            # Get default reasoning agent
            use_json_mode: bool = self.use_json_mode
            reasoning_agent: Optional[Agent] = self.reasoning_agent  # type: ignore
            if reasoning_agent is None:
                reasoning_agent = get_default_reasoning_agent(  # type: ignore
                    reasoning_model=reasoning_model,
                    min_steps=self.reasoning_min_steps,
                    max_steps=self.reasoning_max_steps,
                    monitoring=self.monitoring,
                    telemetry=self.telemetry,
                    debug_mode=self.debug_mode,
                    use_json_mode=use_json_mode,
                )

            # Validate reasoning agent
            if reasoning_agent is None:
                log_warning(
                    "Reasoning error. Reasoning agent is None, continuing regular session..."
                )
                return
            # Ensure the reasoning agent response model is ReasoningSteps
            if (
                reasoning_agent.response_model is not None
                and not isinstance(reasoning_agent.response_model, type)
                and not issubclass(reasoning_agent.response_model, ReasoningSteps)
            ):
                log_warning(
                    "Reasoning agent response model should be `ReasoningSteps`, continuing regular session..."
                )
                return

            # Ensure the reasoning model and agent do not show tool calls
            reasoning_agent.show_tool_calls = False

            step_count = 1
            next_action = NextAction.CONTINUE
            reasoning_messages: List[Message] = []
            all_reasoning_steps: List[ReasoningStep] = []
            log_debug("Starting Reasoning", center=True, symbol="=")
            while (
                next_action == NextAction.CONTINUE
                and step_count < self.reasoning_max_steps
            ):
                log_debug(f"Step {step_count}", center=True, symbol="-")
                step_count += 1
                try:
                    # Run the reasoning agent
                    reasoning_agent_response: RunResponse = await reasoning_agent.arun(  # type: ignore
                        messages=run_messages.get_input_messages()
                    )
                    if (
                        reasoning_agent_response.content is None
                        or reasoning_agent_response.messages is None
                    ):
                        log_warning(
                            "Reasoning error. Reasoning response is empty, continuing regular session..."
                        )
                        break

                    if reasoning_agent_response.content.reasoning_steps is None:
                        log_warning(
                            "Reasoning error. Reasoning steps are empty, continuing regular session..."
                        )
                        break

                    reasoning_steps: List[ReasoningStep] = (
                        reasoning_agent_response.content.reasoning_steps
                    )
                    all_reasoning_steps.extend(reasoning_steps)
                    # Yield reasoning steps
                    if self.stream_intermediate_steps:
                        for reasoning_step in reasoning_steps:
                            updated_reasoning_content = (
                                self._format_reasoning_step_content(
                                    run_response, reasoning_step
                                )
                            )

                            yield create_team_reasoning_step_event(
                                from_run_response=run_response,
                                reasoning_step=reasoning_step,
                                reasoning_content=updated_reasoning_content,
                            )

                    # Find the index of the first assistant message
                    first_assistant_index = next(
                        (
                            i
                            for i, m in enumerate(reasoning_agent_response.messages)
                            if m.role == "assistant"
                        ),
                        len(reasoning_agent_response.messages),
                    )
                    # Extract reasoning messages starting from the message after the first assistant message
                    reasoning_messages = reasoning_agent_response.messages[
                        first_assistant_index:
                    ]

                    # Add reasoning step to the Agent's run_response
                    update_run_response_with_reasoning(
                        run_response=run_response,
                        reasoning_steps=reasoning_steps,
                        reasoning_agent_messages=reasoning_agent_response.messages,
                    )

                    # Get the next action
                    next_action = get_next_action(reasoning_steps[-1])
                    if next_action == NextAction.FINAL_ANSWER:
                        break
                except Exception as e:
                    log_error(f"Reasoning error: {e}")
                    break

            log_debug(f"Total Reasoning steps: {len(all_reasoning_steps)}")
            log_debug("Reasoning finished", center=True, symbol="=")

            # Update the messages_for_model to include reasoning messages
            update_messages_with_reasoning(
                run_messages=run_messages,
                reasoning_messages=reasoning_messages,
            )

            # Yield the final reasoning completed event
            if self.stream_intermediate_steps:
                yield create_team_reasoning_completed_event(
                    from_run_response=run_response,
                    content=ReasoningSteps(reasoning_steps=all_reasoning_steps),
                    content_type=ReasoningSteps.__name__,
                )

    def _generator_wrapper(
        self, event: TeamRunResponseEvent
    ) -> Iterator[TeamRunResponseEvent]:
        yield event

    async def _async_generator_wrapper(
        self, event: TeamRunResponseEvent
    ) -> AsyncIterator[TeamRunResponseEvent]:
        yield event

    def _create_run_response(
        self,
        session_id: str,
        content: Optional[Any] = None,
        content_type: Optional[str] = None,
        thinking: Optional[str] = None,
        run_state: RunStatus = RunStatus.running,
        tools: Optional[List[ToolExecution]] = None,
        reasoning_content: Optional[str] = None,
        audio: Optional[List[AudioArtifact]] = None,
        images: Optional[List[ImageArtifact]] = None,
        videos: Optional[List[VideoArtifact]] = None,
        response_audio: Optional[AudioResponse] = None,
        citations: Optional[Citations] = None,
        model: Optional[str] = None,
        messages: Optional[List[Message]] = None,
        created_at: Optional[int] = None,
        from_run_response: Optional[TeamRunResponse] = None,
    ) -> TeamRunResponse:
        extra_data = None
        member_responses = None
        formatted_tool_calls = None
        if from_run_response:
            content = from_run_response.content
            content_type = from_run_response.content_type
            audio = from_run_response.audio
            images = from_run_response.images
            videos = from_run_response.videos
            response_audio = from_run_response.response_audio
            model = from_run_response.model
            messages = from_run_response.messages
            extra_data = from_run_response.extra_data
            member_responses = from_run_response.member_responses
            citations = from_run_response.citations
            tools = from_run_response.tools
            formatted_tool_calls = from_run_response.formatted_tool_calls
            reasoning_content = from_run_response.reasoning_content

        rr = TeamRunResponse(
            run_id=self.run_id,
            session_id=session_id,
            team_id=self.team_id,
            content=content,
            thinking=thinking,
            tools=tools,
            audio=audio,
            images=images,
            videos=videos,
            response_audio=response_audio,
            reasoning_content=reasoning_content,
            citations=citations,
            model=model,
            messages=messages,
            extra_data=extra_data,
            status=run_state,
        )
        if formatted_tool_calls:
            rr.formatted_tool_calls = formatted_tool_calls
        if member_responses:
            rr.member_responses = member_responses
        if content_type is not None:
            rr.content_type = content_type
        if created_at is not None:
            rr.created_at = created_at
        return rr

    def _resolve_run_context(self) -> None:
        from inspect import signature

        log_debug("Resolving context")
        if self.context is not None:
            if isinstance(self.context, dict):
                for ctx_key, ctx_value in self.context.items():
                    if callable(ctx_value):
                        try:
                            sig = signature(ctx_value)
                            if "agent" in sig.parameters:
                                resolved_ctx_value = ctx_value(agent=self)
                            else:
                                resolved_ctx_value = ctx_value()
                            if resolved_ctx_value is not None:
                                self.context[ctx_key] = resolved_ctx_value
                        except Exception as e:
                            log_warning(f"Failed to resolve context for {ctx_key}: {e}")
                    else:
                        self.context[ctx_key] = ctx_value
            else:
                log_warning("Context is not a dict")

    def determine_tools_for_model(
        self,
        model: Model,
        session_id: str,
        user_id: Optional[str] = None,
        async_mode: bool = False,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        message: Optional[Union[str, List, Dict, Message]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        audio: Optional[Sequence[Audio]] = None,
        files: Optional[Sequence[File]] = None,
    ) -> None:
        # Prepare tools
        _tools: List[Union[Toolkit, Callable, Function, Dict]] = []

        # Add provided tools
        if self.tools is not None:
            for tool in self.tools:
                _tools.append(tool)

        if self.read_team_history:
            _tools.append(self.get_team_history_function(session_id=session_id))

        if isinstance(self.memory, Memory) and self.enable_agentic_memory:
            _tools.append(
                self.get_update_user_memory_function(
                    user_id=user_id, async_mode=async_mode
                )
            )

        if self.enable_agentic_context:
            _tools.append(self.get_set_shared_context_function(session_id=session_id))

        if self.knowledge is not None or self.retriever is not None:
            # Check if retriever is an async function but used in sync mode
            from inspect import iscoroutinefunction

            if self.retriever is not None and iscoroutinefunction(self.retriever):
                log_warning(
                    "Async retriever function is being used with synchronous agent.run() or agent.print_response(). "
                    "It is recommended to use agent.arun() or agent.aprint_response() instead."
                )

            if self.search_knowledge:
                # Use async or sync search based on async_mode
                if self.enable_agentic_knowledge_filters:
                    _tools.append(
                        self.search_knowledge_base_with_agentic_filters_function(
                            knowledge_filters=knowledge_filters, async_mode=async_mode
                        )
                    )
                else:
                    _tools.append(
                        self.search_knowledge_base_function(
                            knowledge_filters=knowledge_filters, async_mode=async_mode
                        )
                    )

        if self.mode == "route":
            user_message = self._get_user_message(
                message,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                user_id=user_id,
            )
            forward_task_func: Function = self.get_forward_task_function(
                message=user_message,
                session_id=session_id,
                stream=self.stream or False,
                stream_intermediate_steps=self.stream_intermediate_steps,
                async_mode=False,
                images=images,  # type: ignore
                videos=videos,  # type: ignore
                audio=audio,  # type: ignore
                files=files,  # type: ignore
                knowledge_filters=knowledge_filters,
            )
            _tools.append(forward_task_func)
            if self.get_member_information_tool:
                _tools.append(self.get_member_information)

        elif self.mode == "coordinate":
            _tools.append(
                self.get_transfer_task_function(
                    session_id=session_id,
                    stream=self.stream or False,
                    stream_intermediate_steps=self.stream_intermediate_steps,
                    async_mode=False,
                    images=images,  # type: ignore
                    videos=videos,  # type: ignore
                    audio=audio,  # type: ignore
                    files=files,  # type: ignore
                    knowledge_filters=knowledge_filters,
                )
            )
            if self.get_member_information_tool:
                _tools.append(self.get_member_information)

        elif self.mode == "collaborate":
            run_member_agents_func = self.get_run_member_agents_function(
                session_id=session_id,
                stream=self.stream or False,
                stream_intermediate_steps=self.stream_intermediate_steps,
                async_mode=False,
                images=images,  # type: ignore
                videos=videos,  # type: ignore
                audio=audio,  # type: ignore
                files=files,  # type: ignore
            )
            _tools.append(run_member_agents_func)

            if self.get_member_information_tool:
                _tools.append(self.get_member_information)

        self._functions_for_model = {}
        self._tools_for_model = []

        # Get Agent tools
        if len(_tools) > 0:
            log_debug("Processing tools for model")

        # Check if we need strict mode for the model
        strict = False
        if (
            self.response_model is not None
            and not self.use_json_mode
            and model.supports_native_structured_outputs
        ):
            strict = True

        for tool in _tools:
            if isinstance(tool, Dict):
                # If a dict is passed, it is a builtin tool
                # that is run by the model provider and not the Agent
                self._tools_for_model.append(tool)
                log_debug(f"Included builtin tool {tool}")

            elif isinstance(tool, Toolkit):
                # For each function in the toolkit and process entrypoint
                for name, func in tool.functions.items():
                    # If the function does not exist in self.functions
                    if name not in self._functions_for_model:
                        func._agent = self
                        func._team = self
                        func.process_entrypoint(strict=strict)
                        if strict:
                            func.strict = True
                        if self.tool_hooks:
                            func.tool_hooks = self.tool_hooks
                        self._functions_for_model[name] = func
                        self._tools_for_model.append(
                            {"type": "function", "function": func.to_dict()}
                        )
                        log_debug(f"Added tool {name} from {tool.name}")

                # Add instructions from the toolkit
                if tool.add_instructions and tool.instructions is not None:
                    if self._tool_instructions is None:
                        self._tool_instructions = []
                    self._tool_instructions.append(tool.instructions)

            elif isinstance(tool, Function):
                if tool.name not in self._functions_for_model:
                    tool._agent = self
                    tool._team = self
                    tool.process_entrypoint(strict=strict)
                    if strict and tool.strict is None:
                        tool.strict = True
                    if self.tool_hooks:
                        tool.tool_hooks = self.tool_hooks
                    self._functions_for_model[tool.name] = tool
                    self._tools_for_model.append(
                        {"type": "function", "function": tool.to_dict()}
                    )
                    log_debug(f"Added tool {tool.name}")

                # Add instructions from the Function
                if tool.add_instructions and tool.instructions is not None:
                    if self._tool_instructions is None:
                        self._tool_instructions = []
                    self._tool_instructions.append(tool.instructions)

            elif callable(tool):
                # We add the tools, which are callable functions
                try:
                    func = Function.from_callable(tool, strict=strict)
                    func._agent = self
                    func._team = self
                    if strict:
                        func.strict = True
                    if self.tool_hooks:
                        func.tool_hooks = self.tool_hooks
                    self._functions_for_model[func.name] = func
                    self._tools_for_model.append(
                        {"type": "function", "function": func.to_dict()}
                    )
                    log_debug(f"Added tool {func.name}")
                except Exception as e:
                    log_warning(f"Could not add tool {tool}: {e}")

    def get_members_system_message_content(self, indent: int = 0) -> str:
        system_message_content = ""
        for idx, member in enumerate(self.members):
            url_safe_member_id = self._get_member_id(member)

            if isinstance(member, Team):
                system_message_content += f"{indent * ' '} - Team: {member.name}\n"
                system_message_content += f"{indent * ' '} - ID: {url_safe_member_id}\n"
                if member.members is not None:
                    system_message_content += member.get_members_system_message_content(
                        indent=indent + 2
                    )
            else:
                system_message_content += f"{indent * ' '} - Agent {idx + 1}:\n"
                if member.name is not None:
                    system_message_content += (
                        f"{indent * ' '}   - ID: {url_safe_member_id}\n"
                    )
                    system_message_content += (
                        f"{indent * ' '}   - Name: {member.name}\n"
                    )
                if member.role is not None:
                    system_message_content += (
                        f"{indent * ' '}   - Role: {member.role}\n"
                    )
                if member.tools is not None and self.add_member_tools_to_system_message:
                    system_message_content += f"{indent * ' '}   - Member tools:\n"
                    for _tool in member.tools:
                        if isinstance(_tool, Toolkit):
                            for _func in _tool.functions.values():
                                if _func.entrypoint:
                                    system_message_content += (
                                        f"{indent * ' '}    - {_func.name}\n"
                                    )
                        elif isinstance(_tool, Function) and _tool.entrypoint:
                            system_message_content += (
                                f"{indent * ' '}    - {_tool.name}\n"
                            )
                        elif callable(_tool):
                            system_message_content += (
                                f"{indent * ' '}    - {_tool.__name__}\n"
                            )

        return system_message_content

    def get_system_message(
        self,
        session_id: str,
        user_id: Optional[str] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
    ) -> Optional[Message]:
        """Get the system message for the team."""

        # 1. If the system_message is provided, use that.
        if self.system_message is not None:
            if isinstance(self.system_message, Message):
                return self.system_message

            sys_message_content: str = ""
            if isinstance(self.system_message, str):
                sys_message_content = self.system_message
            elif callable(self.system_message):
                sys_message_content = self.system_message(agent=self)
                if not isinstance(sys_message_content, str):
                    raise Exception("system_message must return a string")

            # Format the system message with the session state variables
            if self.add_state_in_messages:
                sys_message_content = self._format_message_with_state_variables(
                    sys_message_content, user_id=user_id
                )

            # type: ignore
            return Message(role=self.system_message_role, content=sys_message_content)

        # 1. Build and return the default system message for the Team.
        # 1.1 Build the list of instructions for the system message
        self.model = cast(Model, self.model)
        instructions: List[str] = []
        if self.instructions is not None:
            _instructions = self.instructions
            if callable(self.instructions):
                _instructions = self.instructions(agent=self)

            if isinstance(_instructions, str):
                instructions.append(_instructions)
            elif isinstance(_instructions, list):
                instructions.extend(_instructions)

        # 1.2 Add instructions from the Model
        _model_instructions = self.model.get_instructions_for_model(
            self._tools_for_model
        )
        if _model_instructions is not None:
            instructions.extend(_model_instructions)

        # 1.3 Build a list of additional information for the system message
        additional_information: List[str] = []
        # 1.3.1 Add instructions for using markdown
        if self.markdown and self.response_model is None:
            additional_information.append("Use markdown to format your answers.")
        # 1.3.2 Add the current datetime
        if self.add_datetime_to_instructions:
            from datetime import datetime

            additional_information.append(f"The current time is {datetime.now()}")

        # 1.3.3 Add the current location
        if self.add_location_to_instructions:
            from agno.utils.location import get_location

            location = get_location()
            if location:
                location_str = ", ".join(
                    filter(
                        None,
                        [
                            location.get("city"),
                            location.get("region"),
                            location.get("country"),
                        ],
                    )
                )
                if location_str:
                    additional_information.append(
                        f"Your approximate location is: {location_str}."
                    )

        if self.knowledge is not None and self.enable_agentic_knowledge_filters:
            valid_filters = getattr(self.knowledge, "valid_metadata_filters", None)
            if valid_filters:
                valid_filters_str = ", ".join(valid_filters)
                additional_information.append(
                    dedent(f"""
                    The knowledge base contains documents with these metadata filters: {valid_filters_str}.
                    Always use filters when the user query indicates specific metadata.
                    Examples:
                    1. If the user asks about a specific person like "Jordan Mitchell", you MUST use the search_knowledge_base tool with the filters parameter set to {{'<valid key like user_id>': '<valid value based on the user query>'}}.
                    2. If the user asks about a specific document type like "contracts", you MUST use the search_knowledge_base tool with the filters parameter set to {{'document_type': 'contract'}}.
                    4. If the user asks about a specific location like "documents from New York", you MUST use the search_knowledge_base tool with the filters parameter set to {{'<valid key like location>': 'New York'}}.
                    General Guidelines:
                    - Always analyze the user query to identify relevant metadata.
                    - Use the most specific filter(s) possible to narrow down results.
                    - If multiple filters are relevant, combine them in the filters parameter (e.g., {{'name': 'Jordan Mitchell', 'document_type': 'contract'}}).
                    - Ensure the filter keys match the valid metadata filters: {valid_filters_str}.
                    You can use the search_knowledge_base tool to search the knowledge base and get the most relevant documents. Make sure to pass the filters as [Dict[str: Any]] to the tool. FOLLOW THIS STRUCTURE STRICTLY.
                """)
                )

        # 2 Build the default system message for the Agent.
        system_message_content: str = ""
        system_message_content += (
            "You are the leader of a team and sub-teams of AI Agents.\n"
        )
        system_message_content += (
            "Your task is to coordinate the team to complete the user's request.\n"
        )

        system_message_content += "\nHere are the members in your team:\n"
        system_message_content += "<team_members>\n"
        system_message_content += self.get_members_system_message_content()
        if self.get_member_information_tool:
            system_message_content += "If you need to get information about your team members, you can use the `get_member_information` tool at any time.\n"
        system_message_content += "</team_members>\n"

        system_message_content += "\n<how_to_respond>\n"
        if self.mode == "coordinate":
            system_message_content += (
                "- You can either respond directly or transfer tasks to members in your team with the highest likelihood of completing the user's request.\n"
                "- Carefully analyze the tools available to the members and their roles before transferring tasks.\n"
                "- You cannot use a member tool directly. You can only transfer tasks to members.\n"
                "- When you transfer a task to another member, make sure to include:\n"
                "  - member_id (str): The ID of the member to forward the task to.\n"
                "  - task_description (str): A clear description of the task.\n"
                "  - expected_output (str): The expected output.\n"
                "- You can transfer tasks to multiple members at once.\n"
                "- You must always analyze the responses from members before responding to the user.\n"
                "- After analyzing the responses from the members, if you feel the task has been completed, you can stop and respond to the user.\n"
                "- If you are not satisfied with the responses from the members, you should re-assign the task.\n"
            )
        elif self.mode == "route":
            system_message_content += (
                "- You can either respond directly or forward tasks to members in your team with the highest likelihood of completing the user's request.\n"
                "- Carefully analyze the tools available to the members and their roles before forwarding tasks.\n"
                "- When you forward a task to another Agent, make sure to include:\n"
                "  - member_id (str): The ID of the member to forward the task to.\n"
                "  - expected_output (str): The expected output.\n"
                "- You can forward tasks to multiple members at once.\n"
            )
        elif self.mode == "collaborate":
            system_message_content += (
                "- You can either respond directly or use the `run_member_agents` tool to run all members in your team to get a collaborative response.\n"
                "- To run the members in your team, call `run_member_agents` ONLY once. This will run all members in your team.\n"
                "- Analyze the responses from all members and evaluate whether the task has been completed.\n"
                "- If you feel the task has been completed, you can stop and respond to the user.\n"
            )
        system_message_content += "</how_to_respond>\n\n"

        if self.enable_agentic_context:
            system_message_content += "<shared_context>\n"
            system_message_content += "You have access to a shared context that will be shared with all members of the team.\n"
            system_message_content += "Use this shared context to improve inter-agent communication and coordination.\n"
            system_message_content += "It is important that you update the shared context as often as possible.\n"
            system_message_content += (
                "To update the shared context, use the `set_shared_context` tool.\n"
            )
            system_message_content += "</shared_context>\n\n"

        if self.name is not None:
            system_message_content += f"Your name is: {self.name}\n\n"

        if self.success_criteria:
            system_message_content += (
                "Your task is successful when the following criteria is met:\n"
            )
            system_message_content += "<success_criteria>\n"
            system_message_content += f"{self.success_criteria}\n"
            system_message_content += "</success_criteria>\n"
            system_message_content += (
                "Stop the team run when the success_criteria is met.\n\n"
            )

        # Attached media
        if (
            audio is not None
            or images is not None
            or videos is not None
            or files is not None
        ):
            system_message_content += "<attached_media>\n"
            system_message_content += (
                "You have the following media attached to your message:\n"
            )
            if audio is not None and len(audio) > 0:
                system_message_content += " - Audio\n"
            if images is not None and len(images) > 0:
                system_message_content += " - Images\n"
            if videos is not None and len(videos) > 0:
                system_message_content += " - Videos\n"
            if files is not None and len(files) > 0:
                system_message_content += " - Files\n"
            system_message_content += "</attached_media>\n\n"

        # Then add memories to the system prompt
        if self.memory:
            if isinstance(self.memory, Memory) and self.add_memory_references:
                if not user_id:
                    user_id = "default"
                user_memories = self.memory.get_user_memories(user_id=user_id)  # type: ignore
                if user_memories and len(user_memories) > 0:
                    system_message_content += "You have access to memories from previous interactions with the user that you can use:\n\n"
                    system_message_content += "<memories_from_previous_interactions>"
                    for _memory in user_memories:  # type: ignore
                        system_message_content += f"\n- {_memory.memory}"
                    system_message_content += (
                        "\n</memories_from_previous_interactions>\n\n"
                    )
                    system_message_content += (
                        "Note: this information is from previous interactions and may be updated in this conversation. "
                        "You should always prefer information from this conversation over the past memories.\n\n"
                    )
                else:
                    system_message_content += (
                        "You have the capability to retain memories from previous interactions with the user, "
                        "but have not had any interactions with the user yet.\n"
                    )

                if self.enable_agentic_memory:
                    system_message_content += (
                        "You have access to the `update_user_memory` tool.\n"
                        "You can use the `update_user_memory` tool to add new memories, update existing memories, delete memories, or clear all memories.\n"
                        "Memories should include details that could personalize ongoing interactions with the user.\n"
                        "Use this tool to add new memories or update existing memories that you identify in the conversation.\n"
                        "Use this tool if the user asks to update their memory, delete a memory, or clear all memories.\n"
                        "If you use the `update_user_memory` tool, remember to pass on the response to the user.\n\n"
                    )

            # Then add a summary of the interaction to the system prompt
            if isinstance(self.memory, Memory) and self.add_session_summary_references:
                if not user_id:
                    user_id = "default"
                session_summary: SessionSummary = self.memory.summaries.get(
                    user_id, {}
                ).get(session_id, None)  # type: ignore
                if session_summary is not None:
                    system_message_content += (
                        "Here is a brief summary of your previous interactions:\n\n"
                    )
                    system_message_content += "<summary_of_previous_interactions>\n"
                    system_message_content += session_summary.summary
                    system_message_content += (
                        "\n</summary_of_previous_interactions>\n\n"
                    )
                    system_message_content += (
                        "Note: this information is from previous interactions and may be outdated. "
                        "You should ALWAYS prefer information from this conversation over the past summary.\n\n"
                    )

        if self.description is not None:
            system_message_content += (
                f"<description>\n{self.description}\n</description>\n\n"
            )

        # 3.3.5 Then add instructions for the Agent
        if len(instructions) > 0:
            system_message_content += "<instructions>"
            if len(instructions) > 1:
                for _upi in instructions:
                    system_message_content += f"\n- {_upi}"
            else:
                system_message_content += "\n" + instructions[0]
            system_message_content += "\n</instructions>\n\n"
        # 3.3.6 Add additional information
        if len(additional_information) > 0:
            system_message_content += "<additional_information>"
            for _ai in additional_information:
                system_message_content += f"\n- {_ai}"
            system_message_content += "\n</additional_information>\n\n"
        # 3.3.7 Then add instructions for the tools
        if self._tool_instructions is not None:
            for _ti in self._tool_instructions:
                system_message_content += f"{_ti}\n"

        # Format the system message with the session state variables
        if self.add_state_in_messages:
            system_message_content = self._format_message_with_state_variables(
                system_message_content, user_id=user_id
            )

        system_message_from_model = self.model.get_system_message_for_model(
            self._tools_for_model
        )
        if system_message_from_model is not None:
            system_message_content += system_message_from_model

        if self.expected_output is not None:
            system_message_content += f"<expected_output>\n{self.expected_output.strip()}\n</expected_output>\n\n"

        if self.additional_context is not None:
            system_message_content += f"<additional_context>\n{self.additional_context.strip()}\n</additional_context>\n\n"

        # Add the JSON output prompt if response_model is provided and structured_outputs is False
        if (
            self.response_model is not None
            and self.use_json_mode
            and self.model
            and self.model.supports_native_structured_outputs
        ):
            system_message_content += f"{self._get_json_output_prompt()}"

        return Message(
            role=self.system_message_role, content=system_message_content.strip()
        )

    def get_run_messages(
        self,
        *,
        session_id: str,
        user_id: Optional[str] = None,
        message: Optional[Union[str, List, Dict, Message]] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs: Any,
    ) -> RunMessages:
        """This function returns a RunMessages object with the following attributes:
            - system_message: The system message for this run
            - user_message: The user message for this run
            - messages: List of messages to send to the model

        To build the RunMessages object:
        1. Add system message to run_messages
        2. Add history to run_messages
        3. Add user message to run_messages

        """
        # Initialize the RunMessages object
        run_messages = RunMessages()

        # 1. Add system message to run_messages
        system_message = self.get_system_message(
            session_id=session_id,
            user_id=user_id,
            images=images,
            audio=audio,
            videos=videos,
            files=files,
        )
        if system_message is not None:
            run_messages.system_message = system_message
            run_messages.messages.append(system_message)

        # 2. Add history to run_messages
        if self.enable_team_history or self.add_history_to_messages:
            from copy import deepcopy

            history = []
            if isinstance(self.memory, TeamMemory):
                history = self.memory.get_messages_from_last_n_runs(
                    last_n=self.num_history_runs, skip_role=self.system_message_role
                )
            elif isinstance(self.memory, Memory):
                history = self.memory.get_messages_from_last_n_runs(
                    session_id=session_id,
                    last_n=self.num_history_runs,
                    skip_role=self.system_message_role,
                )

            if len(history) > 0:
                # Create a deep copy of the history messages to avoid modifying the original messages
                history_copy = [deepcopy(msg) for msg in history]

                # Tag each message as coming from history
                for _msg in history_copy:
                    _msg.from_history = True

                log_debug(f"Adding {len(history_copy)} messages from history")

                # Extend the messages with the history
                run_messages.messages += history_copy

        # 3. Add user message to run_messages
        user_message = self._get_user_message(
            message,
            user_id=user_id,
            audio=audio,
            images=images,
            videos=videos,
            files=files,
            knowledge_filters=knowledge_filters,
            **kwargs,
        )

        # Add user message to run_messages
        if user_message is not None:
            run_messages.user_message = user_message
            run_messages.messages.append(user_message)

        return run_messages

    def _get_user_message(
        self,
        message: Optional[Union[str, List, Dict, Message]] = None,
        user_id: Optional[str] = None,
        audio: Optional[Sequence[Audio]] = None,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        # Get references from the knowledge base to use in the user message
        references = None
        self.run_response = cast(TeamRunResponse, self.run_response)
        if self.add_references and message:
            message_str: str
            if isinstance(message, str):
                message_str = message
            elif callable(message):
                message_str = message(agent=self)
            else:
                raise Exception(
                    "message must be a string or a callable when add_references is True"
                )

            try:
                retrieval_timer = Timer()
                retrieval_timer.start()
                docs_from_knowledge = self.get_relevant_docs_from_knowledge(
                    query=message_str, filters=knowledge_filters, **kwargs
                )
                if docs_from_knowledge is not None:
                    references = MessageReferences(
                        query=message_str,
                        references=docs_from_knowledge,
                        time=round(retrieval_timer.elapsed, 4),
                    )
                    # Add the references to the run_response
                    if self.run_response.extra_data is None:
                        self.run_response.extra_data = RunResponseExtraData()
                    if self.run_response.extra_data.references is None:
                        self.run_response.extra_data.references = []
                    self.run_response.extra_data.references.append(references)
                retrieval_timer.stop()
                log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")
            except Exception as e:
                log_warning(f"Failed to get references: {e}")

        # Build user message if message is None, str or list
        user_message_content: str = ""
        if isinstance(message, str) or isinstance(message, list):
            if self.add_state_in_messages:
                if isinstance(message, str):
                    user_message_content = self._format_message_with_state_variables(
                        message, user_id=user_id
                    )
                elif isinstance(message, list):
                    user_message_content = "\n".join(
                        [
                            self._format_message_with_state_variables(
                                msg, user_id=user_id
                            )
                            for msg in message
                        ]
                    )
            else:
                if isinstance(message, str):
                    user_message_content = message
                else:
                    user_message_content = "\n".join(message)

            # Add references to user message
            if (
                self.add_references
                and references is not None
                and references.references is not None
                and len(references.references) > 0
            ):
                user_message_content += "\n\nUse the following references from the knowledge base if it helps:\n"
                user_message_content += "<references>\n"
                user_message_content += (
                    self._convert_documents_to_string(references.references) + "\n"
                )
                user_message_content += "</references>"
            # Add context to user message
            if self.add_context and self.context is not None:
                user_message_content += "\n\n<context>\n"
                user_message_content += (
                    self._convert_context_to_string(self.context) + "\n"
                )
                user_message_content += "</context>"

            return Message(
                role="user",
                content=user_message_content,
                audio=audio,
                images=images,
                videos=videos,
                files=files,
                **kwargs,
            )

        # Build the default user message for the Agent
        elif message is None:
            # If we have any media, return a message with empty content
            if (
                images is not None
                or audio is not None
                or videos is not None
                or files is not None
            ):
                return Message(
                    role="user",
                    content="",
                    images=images,
                    audio=audio,
                    videos=videos,
                    files=files,
                    **kwargs,
                )
            else:
                # If the message is None, return None
                return None

        # If message is provided as a Message, use it directly
        elif isinstance(message, Message):
            return message
        # If message is provided as a dict, try to validate it as a Message
        elif isinstance(message, dict):
            try:
                return Message.model_validate(message)
            except Exception as e:
                log_warning(f"Failed to validate message: {e}")

    def _format_message_with_state_variables(
        self, message: Any, user_id: Optional[str] = None
    ) -> Any:
        """Format a message with the session state variables."""
        import re
        import string

        if not isinstance(message, str):
            return message

        format_variables = ChainMap(
            self.session_state or {},
            self.team_session_state or {},
            self.context or {},
            self.extra_data or {},
            {"user_id": user_id} if user_id is not None else {},
        )
        converted_msg = message
        for var_name in format_variables.keys():
            # Only convert standalone {var_name} patterns, not nested ones
            pattern = r"\{" + re.escape(var_name) + r"\}"
            replacement = "${" + var_name + "}"
            converted_msg = re.sub(pattern, replacement, converted_msg)

        # Use Template to safely substitute variables
        template = string.Template(converted_msg)
        try:
            result = template.safe_substitute(format_variables)
            return result
        except Exception as e:
            log_warning(f"Template substitution failed: {e}")
            return message

    def _convert_context_to_string(self, context: Dict[str, Any]) -> str:
        """Convert the context dictionary to a string representation.

        Args:
            context: Dictionary containing context data

        Returns:
            String representation of the context, or empty string if conversion fails
        """

        if context is None:
            return ""

        import json

        try:
            return json.dumps(context, indent=2, default=str)
        except (TypeError, ValueError, OverflowError) as e:
            log_warning(f"Failed to convert context to JSON: {e}")
            # Attempt a fallback conversion for non-serializable objects
            sanitized_context = {}
            for key, value in context.items():
                try:
                    # Try to serialize each value individually
                    json.dumps({key: value}, default=str)
                    sanitized_context[key] = value
                except Exception as e:
                    log_error(f"Failed to serialize to JSON: {e}")
                    # If serialization fails, convert to string representation
                    sanitized_context[key] = str(value)

            try:
                return json.dumps(sanitized_context, indent=2)
            except Exception as e:
                log_error(f"Failed to convert sanitized context to JSON: {e}")
                return str(context)

    def _get_json_output_prompt(self) -> str:
        """Return the JSON output prompt for the Agent.

        This is added to the system prompt when the response_model is set and structured_outputs is False.
        """
        import json

        json_output_prompt = (
            "Provide your output as a JSON containing the following fields:"
        )
        if self.response_model is not None:
            if isinstance(self.response_model, str):
                json_output_prompt += "\n<json_fields>"
                json_output_prompt += f"\n{self.response_model}"
                json_output_prompt += "\n</json_fields>"
            elif isinstance(self.response_model, list):
                json_output_prompt += "\n<json_fields>"
                json_output_prompt += f"\n{json.dumps(self.response_model)}"
                json_output_prompt += "\n</json_fields>"
            elif issubclass(self.response_model, BaseModel):
                json_schema = self.response_model.model_json_schema()
                if json_schema is not None:
                    response_model_properties = {}
                    json_schema_properties = json_schema.get("properties")
                    if json_schema_properties is not None:
                        for (
                            field_name,
                            field_properties,
                        ) in json_schema_properties.items():
                            formatted_field_properties = {
                                prop_name: prop_value
                                for prop_name, prop_value in field_properties.items()
                                if prop_name != "title"
                            }
                            response_model_properties[field_name] = (
                                formatted_field_properties
                            )
                    json_schema_defs = json_schema.get("$defs")
                    if json_schema_defs is not None:
                        response_model_properties["$defs"] = {}
                        for def_name, def_properties in json_schema_defs.items():
                            def_fields = def_properties.get("properties")
                            formatted_def_properties = {}
                            if def_fields is not None:
                                for field_name, field_properties in def_fields.items():
                                    formatted_field_properties = {
                                        prop_name: prop_value
                                        for prop_name, prop_value in field_properties.items()
                                        if prop_name != "title"
                                    }
                                    formatted_def_properties[field_name] = (
                                        formatted_field_properties
                                    )
                            if len(formatted_def_properties) > 0:
                                response_model_properties["$defs"][def_name] = (
                                    formatted_def_properties
                                )

                    if len(response_model_properties) > 0:
                        json_output_prompt += "\n<json_fields>"
                        json_output_prompt += f"\n{json.dumps([key for key in response_model_properties.keys() if key != '$defs'])}"
                        json_output_prompt += "\n</json_fields>"
                        json_output_prompt += (
                            "\n\nHere are the properties for each field:"
                        )
                        json_output_prompt += "\n<json_field_properties>"
                        json_output_prompt += (
                            f"\n{json.dumps(response_model_properties, indent=2)}"
                        )
                        json_output_prompt += "\n</json_field_properties>"
            else:
                log_warning(f"Could not build json schema for {self.response_model}")
        else:
            json_output_prompt += "Provide the output as JSON."

        json_output_prompt += "\nStart your response with `{` and end it with `}`."
        json_output_prompt += "\nYour output will be passed to json.loads() to convert it to a Python object."
        json_output_prompt += "\nMake sure it only contains valid JSON."
        return json_output_prompt

    def _update_team_media(
        self, run_response: Union[TeamRunResponse, RunResponse]
    ) -> None:
        """Update the team state with the run response."""
        if run_response.images is not None:
            if self.images is None:
                self.images = []
            self.images.extend(run_response.images)
        if run_response.videos is not None:
            if self.videos is None:
                self.videos = []
            self.videos.extend(run_response.videos)
        if run_response.audio is not None:
            if self.audio is None:
                self.audio = []
            self.audio.extend(run_response.audio)

    ###########################################################################
    # Built-in Tools
    ###########################################################################

    def get_update_user_memory_function(
        self, user_id: Optional[str] = None, async_mode: bool = False
    ) -> Callable:
        def update_user_memory(task: str) -> str:
            """
            Use this function to submit a task to modify the Agent's memory.
            Describe the task in detail and be specific.
            The task can include adding a memory, updating a memory, deleting a memory, or clearing all memories.

            Args:
                task: The task to update the memory. Be specific and describe the task in detail.

            Returns:
                str: A string indicating the status of the update.
            """
            self.memory = cast(Memory, self.memory)
            response = self.memory.update_memory_task(task=task, user_id=user_id)
            return response

        async def aupdate_user_memory(task: str) -> str:
            """
            Use this function to submit a task to modify the Agent's memory.
            Describe the task in detail and be specific.
            The task can include adding a memory, updating a memory, deleting a memory, or clearing all memories.

            Args:
                task: The task to update the memory. Be specific and describe the task in detail.

            Returns:
                str: A string indicating the status of the update.
            """
            self.memory = cast(Memory, self.memory)
            response = await self.memory.aupdate_memory_task(task=task, user_id=user_id)
            return response

        if async_mode:
            return aupdate_user_memory
        else:
            return update_user_memory

    def get_member_information(self) -> str:
        """Get information about the members of the team, including their IDs, names, and roles."""
        return self.get_members_system_message_content(indent=0)

    def get_team_history_function(self, session_id: str) -> Callable:
        def get_team_history(num_chats: Optional[int] = None) -> str:
            """
            Use this function to get the team chat history.

            Args:
                num_chats: The number of chats to return.
                    Each chat contains 2 messages. One from the team and one from the user.
                    Default: None

            Returns:
                str: A JSON string containing a list of dictionaries representing the team chat history.

            Example:
                - To get the last chat, use num_chats=1
                - To get the last 5 chats, use num_chats=5
                - To get all chats, use num_chats=None
                - To get the first chat, use num_chats=None and take the first message
            """
            import json

            history: List[Dict[str, Any]] = []
            if isinstance(self.memory, TeamMemory):
                team_chats = self.memory.get_all_messages()

                if len(team_chats) == 0:
                    return ""

                chats_added = 0
                for chat in team_chats[::-1]:
                    history.insert(0, chat[1].to_dict())
                    history.insert(0, chat[0].to_dict())
                    chats_added += 1
                    if num_chats is not None and chats_added >= num_chats:
                        break

            elif isinstance(self.memory, Memory):
                all_chats = self.memory.get_messages_for_session(session_id=session_id)

                if len(all_chats) == 0:
                    return ""

                for chat in all_chats[::-1]:  # type: ignore
                    history.insert(0, chat.to_dict())  # type: ignore

                if num_chats is not None:
                    history = history[:num_chats]

            else:
                return ""

            return json.dumps(history)

        return get_team_history

    def get_set_shared_context_function(self, session_id: str) -> Callable:
        def set_shared_context(state: Union[str, dict]) -> str:
            """
            Set or update the team's shared context with the given state.

            Args:
                state (str or dict): The state to set as the team context.
            """
            if isinstance(self.memory, TeamMemory):
                if isinstance(state, str):
                    self.memory.set_team_context_text(state)  # type: ignore
                elif isinstance(state, dict):
                    self.memory.set_team_context_text(json.dumps(state))  # type: ignore
                msg = f"Current team context: {self.memory.get_team_context_str()}"  # type: ignore
            else:
                self.memory.set_team_context_text(session_id=session_id, text=state)  # type: ignore
                msg = f"Current team context: {self.memory.get_team_context_str(session_id=session_id)}"  # type: ignore
            log_debug(msg)  # type: ignore
            return msg

        return set_shared_context

    def _update_team_session_state(self, member_agent: Union[Agent, "Team"]) -> None:
        """Update team session state from either an Agent or nested Team member"""
        if member_agent.team_session_state is not None:
            if self.team_session_state is None:
                self.team_session_state = member_agent.team_session_state
            else:
                merge_dictionaries(
                    self.team_session_state, member_agent.team_session_state
                )

    def get_run_member_agents_function(
        self,
        session_id: str,
        stream: bool = False,
        stream_intermediate_steps: bool = False,
        async_mode: bool = False,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        audio: Optional[List[Audio]] = None,
        files: Optional[List[File]] = None,
    ) -> Function:
        if not images:
            images = []
        if not videos:
            videos = []
        if not audio:
            audio = []
        if not files:
            files = []

        def _determine_team_context(
            self, session_id: str
        ) -> Tuple[Optional[str], Optional[str]]:
            if isinstance(self.memory, TeamMemory):
                self.memory = cast(TeamMemory, self.memory)
                team_context_str = None
                if self.enable_agentic_context:
                    team_context_str = self.memory.get_team_context_str()

                team_member_interactions_str = None
                if self.share_member_interactions:
                    team_member_interactions_str = (
                        self.memory.get_team_member_interactions_str()
                    )
                    if context_images := self.memory.get_team_context_images():
                        images.extend(
                            [Image.from_artifact(img) for img in context_images]
                        )
                    if context_videos := self.memory.get_team_context_videos():
                        videos.extend(
                            [Video.from_artifact(vid) for vid in context_videos]
                        )
                    if context_audio := self.memory.get_team_context_audio():
                        audio.extend(
                            [Audio.from_artifact(aud) for aud in context_audio]
                        )
            else:
                self.memory = cast(Memory, self.memory)
                team_context_str = None
                if self.enable_agentic_context:
                    team_context_str = self.memory.get_team_context_str(
                        session_id=session_id
                    )  # type: ignore

                team_member_interactions_str = None
                if self.share_member_interactions:
                    team_member_interactions_str = (
                        self.memory.get_team_member_interactions_str(
                            session_id=session_id
                        )
                    )  # type: ignore
                    if context_images := self.memory.get_team_context_images(
                        session_id=session_id
                    ):  # type: ignore
                        images.extend(
                            [Image.from_artifact(img) for img in context_images]
                        )
                    if context_videos := self.memory.get_team_context_videos(
                        session_id=session_id
                    ):  # type: ignore
                        videos.extend(
                            [Video.from_artifact(vid) for vid in context_videos]
                        )
                    if context_audio := self.memory.get_team_context_audio(
                        session_id=session_id
                    ):  # type: ignore
                        audio.extend(
                            [Audio.from_artifact(aud) for aud in context_audio]
                        )
            return team_context_str, team_member_interactions_str

        def run_member_agents(
            task_description: str, expected_output: Optional[str] = None
        ) -> Iterator[Union[RunResponseEvent, TeamRunResponseEvent, str]]:
            """
            Send the same task to all the member agents and return the responses.

            Args:
                task_description (str): The task description to send to the member agents.
                expected_output (str): The expected output from the member agents.

            Returns:
                str: The responses from the member agents.
            """
            # Make sure for the member agent, we are using the agent logger
            use_agent_logger()
            self.memory = cast(TeamMemory, self.memory)

            # 2. Determine team context to send
            team_context_str, team_member_interactions_str = _determine_team_context(
                self, session_id
            )

            # 3. Create the member agent task
            member_agent_task = self._formate_member_agent_task(
                task_description,
                expected_output,
                team_context_str,
                team_member_interactions_str,
            )

            for member_agent_index, member_agent in enumerate(self.members):
                self._initialize_member(member_agent, session_id=session_id)

                if stream:
                    member_agent_run_response_stream = member_agent.run(
                        member_agent_task,
                        images=images,
                        videos=videos,
                        audio=audio,
                        files=files,
                        stream=True,
                        stream_intermediate_steps=stream_intermediate_steps,
                    )
                    for (
                        member_agent_run_response_chunk
                    ) in member_agent_run_response_stream:
                        check_if_run_cancelled(member_agent_run_response_chunk)
                        yield member_agent_run_response_chunk
                else:
                    member_agent_run_response = member_agent.run(
                        member_agent_task,
                        images=images,
                        videos=videos,
                        audio=audio,
                        files=files,
                        stream=False,
                    )

                    check_if_run_cancelled(member_agent_run_response)

                    try:
                        if member_agent_run_response.content is None and (
                            member_agent_run_response.tools is None
                            or len(member_agent_run_response.tools) == 0
                        ):
                            yield f"Agent {member_agent.name}: No response from the member agent."
                        elif isinstance(member_agent_run_response.content, str):
                            if len(member_agent_run_response.content.strip()) > 0:
                                yield f"Agent {member_agent.name}: {member_agent_run_response.content}"
                            elif (
                                member_agent_run_response.tools is not None
                                and len(member_agent_run_response.tools) > 0
                            ):
                                yield f"Agent {member_agent.name}: {','.join([tool.result for tool in member_agent_run_response.tools])}"  # type: ignore
                        elif issubclass(
                            type(member_agent_run_response.content), BaseModel
                        ):
                            yield f"Agent {member_agent.name}: {member_agent_run_response.content.model_dump_json(indent=2)}"  # type: ignore
                        else:
                            import json

                            yield f"Agent {member_agent.name}: {json.dumps(member_agent_run_response.content, indent=2)}"
                    except Exception as e:
                        yield f"Agent {member_agent.name}: Error - {str(e)}"

                # Update the memory
                member_name = (
                    member_agent.name
                    if member_agent.name
                    else f"agent_{member_agent_index}"
                )
                if isinstance(self.memory, TeamMemory):
                    self.memory = cast(TeamMemory, self.memory)
                    self.memory.add_interaction_to_team_context(
                        member_name=member_name,
                        task=task_description,
                        run_response=member_agent.run_response,  # type: ignore
                    )
                else:
                    self.memory = cast(Memory, self.memory)
                    self.memory.add_interaction_to_team_context(
                        session_id=session_id,
                        member_name=member_name,
                        task=task_description,
                        run_response=member_agent.run_response,  # type: ignore
                    )

                # Add the member run to the team run response
                self.run_response = cast(TeamRunResponse, self.run_response)
                self.run_response.add_member_run(member_agent.run_response)  # type: ignore

                # Update team session state
                self._update_team_session_state(member_agent)

                # Update the team media
                self._update_team_media(member_agent.run_response)  # type: ignore

            # Afterward, switch back to the team logger
            use_team_logger()

        async def arun_member_agents(
            task_description: str, expected_output: Optional[str] = None
        ) -> AsyncIterator[str]:
            """
            Send the same task to all the member agents and return the responses.

            Args:
                task_description (str): The task description to send to the member agents.
                expected_output (str): The expected output from the member agents.

            Returns:
                str: The responses from the member agents.
            """
            # Make sure for the member agent, we are using the agent logger
            use_agent_logger()
            self.memory = cast(TeamMemory, self.memory)

            # 2. Determine team context to send
            team_context_str, team_member_interactions_str = _determine_team_context(
                self, session_id
            )

            # 3. Create the member agent task
            member_agent_task = self._formate_member_agent_task(
                task_description,
                expected_output,
                team_context_str,
                team_member_interactions_str,
            )

            # Create tasks for all member agents
            tasks = []
            for member_agent_index, member_agent in enumerate(self.members):
                # We cannot stream responses with async gather
                current_agent = member_agent  # Create a reference to the current agent
                current_index = (
                    member_agent_index  # Create a reference to the current index
                )
                self._initialize_member(current_agent, session_id=session_id)

                async def run_member_agent(
                    agent=current_agent, idx=current_index
                ) -> str:
                    response = await agent.arun(
                        member_agent_task,
                        images=images,
                        videos=videos,
                        audio=audio,
                        files=files,
                        stream=False,
                    )
                    check_if_run_cancelled(response)

                    member_name = agent.name if agent.name else f"agent_{idx}"
                    self.memory = cast(TeamMemory, self.memory)
                    if isinstance(self.memory, TeamMemory):
                        self.memory = cast(TeamMemory, self.memory)
                        self.memory.add_interaction_to_team_context(
                            member_name=member_name,
                            task=task_description,
                            run_response=agent.run_response,
                        )
                    else:
                        self.memory = cast(Memory, self.memory)
                        self.memory.add_interaction_to_team_context(
                            session_id=session_id,
                            member_name=member_name,
                            task=task_description,
                            run_response=agent.run_response,
                        )

                    # Add the member run to the team run response
                    self.run_response = cast(TeamRunResponse, self.run_response)
                    self.run_response.add_member_run(agent.run_response)

                    # Update team session state
                    self._update_team_session_state(current_agent)

                    # Update the team media
                    self._update_team_media(agent.run_response)

                    try:
                        if response.content is None and (
                            response.tools is None or len(response.tools) == 0
                        ):
                            return f"Agent {member_name}: No response from the member agent."
                        elif isinstance(response.content, str):
                            if len(response.content.strip()) > 0:
                                return f"Agent {member_name}: {response.content}"
                            elif response.tools is not None and len(response.tools) > 0:
                                return f"Agent {member_name}: {','.join([tool.get('content') for tool in response.tools])}"
                        elif issubclass(type(response.content), BaseModel):
                            return f"Agent {member_name}: {response.content.model_dump_json(indent=2)}"  # type: ignore
                        else:
                            import json

                            return f"Agent {member_name}: {json.dumps(response.content, indent=2)}"
                    except Exception as e:
                        return f"Agent {member_name}: Error - {str(e)}"

                    return f"Agent {member_name}: No Response"

                tasks.append(run_member_agent)

            # Need to collect and process yielded values from each task
            results = await asyncio.gather(*[task() for task in tasks])
            for result in results:
                yield result

            # Afterward, switch back to the team logger
            use_team_logger()

        if async_mode:
            run_member_agents_function = arun_member_agents  # type: ignore
        else:
            run_member_agents_function = run_member_agents  # type: ignore

        run_member_agents_func = Function.from_callable(
            run_member_agents_function, strict=True
        )

        return run_member_agents_func

    def get_transfer_task_function(
        self,
        session_id: str,
        stream: bool = False,
        stream_intermediate_steps: bool = False,
        async_mode: bool = False,
        images: Optional[List[Image]] = None,
        videos: Optional[List[Video]] = None,
        audio: Optional[List[Audio]] = None,
        files: Optional[List[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
    ) -> Function:
        if not images:
            images = []
        if not videos:
            videos = []
        if not audio:
            audio = []
        if not files:
            files = []

        def _determine_team_context(
            self, session_id: str
        ) -> Tuple[Optional[str], Optional[str]]:
            if isinstance(self.memory, TeamMemory):
                self.memory = cast(TeamMemory, self.memory)
                team_context_str = None
                if self.enable_agentic_context:
                    team_context_str = self.memory.get_team_context_str()

                team_member_interactions_str = None
                if self.share_member_interactions:
                    team_member_interactions_str = (
                        self.memory.get_team_member_interactions_str()
                    )
                    if context_images := self.memory.get_team_context_images():
                        images.extend(
                            [Image.from_artifact(img) for img in context_images]
                        )
                    if context_videos := self.memory.get_team_context_videos():
                        videos.extend(
                            [Video.from_artifact(vid) for vid in context_videos]
                        )
                    if context_audio := self.memory.get_team_context_audio():
                        audio.extend(
                            [Audio.from_artifact(aud) for aud in context_audio]
                        )
            else:
                self.memory = cast(Memory, self.memory)
                team_context_str = None
                if self.enable_agentic_context:
                    team_context_str = self.memory.get_team_context_str(
                        session_id=session_id
                    )  # type: ignore

                team_member_interactions_str = None
                if self.share_member_interactions:
                    team_member_interactions_str = (
                        self.memory.get_team_member_interactions_str(
                            session_id=session_id
                        )
                    )  # type: ignore
                    if context_images := self.memory.get_team_context_images(
                        session_id=session_id
                    ):  # type: ignore
                        images.extend(
                            [Image.from_artifact(img) for img in context_images]
                        )
                    if context_videos := self.memory.get_team_context_videos(
                        session_id=session_id
                    ):  # type: ignore
                        videos.extend(
                            [Video.from_artifact(vid) for vid in context_videos]
                        )
                    if context_audio := self.memory.get_team_context_audio(
                        session_id=session_id
                    ):  # type: ignore
                        audio.extend(
                            [Audio.from_artifact(aud) for aud in context_audio]
                        )
            return team_context_str, team_member_interactions_str

        def transfer_task_to_member(
            member_id: str, task_description: str, expected_output: Optional[str] = None
        ) -> Iterator[Union[RunResponseEvent, TeamRunResponseEvent, str]]:
            """Use this function to transfer a task to the selected team member.
            You must provide a clear and concise description of the task the member should achieve AND the expected output.

            Args:
                member_id (str): The ID of the member to transfer the task to.
                task_description (str): A clear and concise description of the task the member should achieve.
                expected_output (str): The expected output from the member (optional).
            Returns:
                str: The result of the delegated task.
            """
            # 1. Find the member agent using the helper function
            result = self._find_member_by_id(member_id)
            if result is None:
                yield f"Member with ID {member_id} not found in the team or any subteams. Please choose the correct member from the list of members:\n\n{self.get_members_system_message_content(indent=0)}"
                return

            member_agent_index, member_agent = result
            self._initialize_member(member_agent, session_id=session_id)

            # 2. Determine team context to send
            team_context_str, team_member_interactions_str = _determine_team_context(
                self, session_id
            )

            # 3. Create the member agent task
            member_agent_task = self._formate_member_agent_task(
                task_description,
                expected_output,
                team_context_str,
                team_member_interactions_str,
            )

            # Make sure for the member agent, we are using the agent logger
            use_agent_logger()

            # Handle enable_agentic_knowledge_filters on the member agent
            if (
                self.enable_agentic_knowledge_filters
                and not member_agent.enable_agentic_knowledge_filters
            ):
                member_agent.enable_agentic_knowledge_filters = (
                    self.enable_agentic_knowledge_filters
                )

            if stream:
                member_agent_run_response_stream = member_agent.run(
                    member_agent_task,
                    images=images,
                    videos=videos,
                    audio=audio,
                    files=files,
                    stream=True,
                    stream_intermediate_steps=stream_intermediate_steps,
                    knowledge_filters=knowledge_filters
                    if not member_agent.knowledge_filters and member_agent.knowledge
                    else None,
                )
                for member_agent_run_response_event in member_agent_run_response_stream:
                    check_if_run_cancelled(member_agent_run_response_event)

                    # Yield the member event directly
                    yield member_agent_run_response_event
            else:
                member_agent_run_response = member_agent.run(
                    member_agent_task,
                    images=images,
                    videos=videos,
                    audio=audio,
                    files=files,
                    stream=False,
                    knowledge_filters=knowledge_filters
                    if not member_agent.knowledge_filters and member_agent.knowledge
                    else None,
                )

                check_if_run_cancelled(member_agent_run_response)

                try:
                    if member_agent_run_response.content is None and (
                        member_agent_run_response.tools is None
                        or len(member_agent_run_response.tools) == 0
                    ):
                        yield "No response from the member agent."
                    elif isinstance(member_agent_run_response.content, str):
                        content = member_agent_run_response.content.strip()
                        if len(content) > 0:
                            yield content

                        # If the content is empty but we have tool calls
                        elif (
                            member_agent_run_response.tools is not None
                            and len(member_agent_run_response.tools) > 0
                        ):
                            tool_str = ""
                            for tool in member_agent_run_response.tools:
                                if tool.result:
                                    tool_str += f"{tool.result},"
                            yield tool_str.rstrip(",")

                    elif issubclass(type(member_agent_run_response.content), BaseModel):
                        yield member_agent_run_response.content.model_dump_json(
                            indent=2
                        )  # type: ignore
                    else:
                        import json

                        yield json.dumps(member_agent_run_response.content, indent=2)
                except Exception as e:
                    yield str(e)

            # Afterward, switch back to the team logger
            use_team_logger()

            # Update the memory
            member_name = (
                member_agent.name
                if member_agent.name
                else f"agent_{member_agent_index}"
            )

            if isinstance(self.memory, TeamMemory):
                self.memory = cast(TeamMemory, self.memory)
                self.memory.add_interaction_to_team_context(
                    member_name=member_name,
                    task=task_description,
                    run_response=member_agent.run_response,  # type: ignore
                )
            else:
                self.memory = cast(Memory, self.memory)
                self.memory.add_interaction_to_team_context(
                    session_id=session_id,
                    member_name=member_name,
                    task=task_description,
                    run_response=member_agent.run_response,  # type: ignore
                )

            # Add the member run to the team run response
            self.run_response = cast(TeamRunResponse, self.run_response)
            self.run_response.add_member_run(member_agent.run_response)  # type: ignore

            # Update team session state
            self._update_team_session_state(member_agent)

            # Update the team media
            self._update_team_media(member_agent.run_response)  # type: ignore

        async def atransfer_task_to_member(
            member_id: str, task_description: str, expected_output: Optional[str] = None
        ) -> AsyncIterator[Union[RunResponseEvent, TeamRunResponseEvent, str]]:
            """Use this function to transfer a task to the selected team member.
            You must provide a clear and concise description of the task the member should achieve AND the expected output.

            Args:
                member_id (str): The ID of the member to transfer the task to.
                task_description (str): A clear and concise description of the task the member should achieve.
                expected_output (str): The expected output from the member (optional).
            Returns:
                str: The result of the delegated task.
            """

            # Find the member agent using the helper function
            result = self._find_member_by_id(member_id)
            if result is None:
                yield f"Member with ID {member_id} not found in the team or any subteams. Please choose the correct member from the list of members:\n\n{self.get_members_system_message_content(indent=0)}"
                return

            member_agent_index, member_agent = result
            self._initialize_member(member_agent, session_id=session_id)

            # 2. Determine team context to send
            team_context_str, team_member_interactions_str = _determine_team_context(
                self, session_id
            )

            # 3. Create the member agent task
            member_agent_task = self._formate_member_agent_task(
                task_description,
                expected_output,
                team_context_str,
                team_member_interactions_str,
            )

            # Make sure for the member agent, we are using the agent logger
            use_agent_logger()

            # Handle enable_agentic_knowledge_filters
            if (
                self.enable_agentic_knowledge_filters
                and not member_agent.enable_agentic_knowledge_filters
            ):
                member_agent.enable_agentic_knowledge_filters = (
                    self.enable_agentic_knowledge_filters
                )

            if stream:
                member_agent_run_response_stream = await member_agent.arun(
                    member_agent_task,
                    images=images,
                    videos=videos,
                    audio=audio,
                    files=files,
                    stream=True,
                    stream_intermediate_steps=stream_intermediate_steps,
                    knowledge_filters=knowledge_filters
                    if not member_agent.knowledge_filters and member_agent.knowledge
                    else None,
                )
                async for (
                    member_agent_run_response_event
                ) in member_agent_run_response_stream:
                    check_if_run_cancelled(member_agent_run_response_event)
                    yield member_agent_run_response_event
            else:
                member_agent_run_response = await member_agent.arun(
                    member_agent_task,
                    images=images,
                    videos=videos,
                    audio=audio,
                    files=files,
                    stream=False,
                    knowledge_filters=knowledge_filters
                    if not member_agent.knowledge_filters and member_agent.knowledge
                    else None,
                )
                check_if_run_cancelled(member_agent_run_response)

                try:
                    if member_agent_run_response.content is None and (
                        member_agent_run_response.tools is None
                        or len(member_agent_run_response.tools) == 0
                    ):
                        yield "No response from the member agent."
                    elif isinstance(member_agent_run_response.content, str):
                        if len(member_agent_run_response.content.strip()) > 0:
                            yield member_agent_run_response.content

                        # If the content is empty but we have tool calls
                        elif (
                            member_agent_run_response.tools is not None
                            and len(member_agent_run_response.tools) > 0
                        ):
                            yield ",".join(
                                [
                                    tool.result
                                    for tool in member_agent_run_response.tools
                                    if tool.result
                                ]
                            )  # type: ignore
                    elif issubclass(type(member_agent_run_response.content), BaseModel):
                        yield member_agent_run_response.content.model_dump_json(
                            indent=2
                        )  # type: ignore
                    else:
                        import json

                        yield json.dumps(member_agent_run_response.content, indent=2)
                except Exception as e:
                    yield str(e)

            # Afterward, switch back to the team logger
            use_team_logger()

            # Update the memory
            member_name = (
                member_agent.name
                if member_agent.name
                else f"agent_{member_agent_index}"
            )
            if isinstance(self.memory, TeamMemory):
                self.memory = cast(TeamMemory, self.memory)
                self.memory.add_interaction_to_team_context(
                    member_name=member_name,
                    task=task_description,
                    run_response=member_agent.run_response,  # type: ignore
                )
            else:
                self.memory = cast(Memory, self.memory)
                self.memory.add_interaction_to_team_context(
                    session_id=session_id,
                    member_name=member_name,
                    task=task_description,
                    run_response=member_agent.run_response,  # type: ignore
                )

            # Add the member run to the team run response
            self.run_response = cast(TeamRunResponse, self.run_response)
            self.run_response.add_member_run(member_agent.run_response)  # type: ignore

            # Update team session state
            self._update_team_session_state(member_agent)

            # Update the team media
            self._update_team_media(member_agent.run_response)  # type: ignore

        if async_mode:
            transfer_function = atransfer_task_to_member  # type: ignore
        else:
            transfer_function = transfer_task_to_member  # type: ignore

        transfer_func = Function.from_callable(transfer_function, strict=True)

        return transfer_func

    def _formate_member_agent_task(
        self,
        task_description: str,
        expected_output: Optional[str] = None,
        team_context_str: Optional[str] = None,
        team_member_interactions_str: Optional[str] = None,
    ) -> str:
        member_agent_task = "You are a member of a team of agents. Your goal is to complete the following task:"
        member_agent_task += f"\n\n<task>\n{task_description}\n</task>"

        if expected_output is not None:
            member_agent_task += (
                f"\n\n<expected_output>\n{expected_output}\n</expected_output>"
            )

        if team_context_str:
            member_agent_task += f"\n\n{team_context_str}"
        if team_member_interactions_str:
            member_agent_task += f"\n\n{team_member_interactions_str}"

        return member_agent_task

    def _get_member_id(self, member: Union[Agent, "Team"]) -> str:
        """
        Get the ID of a member
        """
        if (
            isinstance(member, Agent)
            and member.agent_id is not None
            and (not is_valid_uuid(member.agent_id))
        ):
            url_safe_member_id = url_safe_string(member.agent_id)
        elif (
            isinstance(member, Team)
            and member.team_id is not None
            and (not is_valid_uuid(member.team_id))
        ):
            url_safe_member_id = url_safe_string(member.team_id)
        elif member.name is not None:
            url_safe_member_id = url_safe_string(member.name)
        else:
            url_safe_member_id = None
        return url_safe_member_id

    def _find_member_by_id(
        self, member_id: str
    ) -> Optional[Tuple[int, Union[Agent, "Team"]]]:
        """
        Recursively search through team members and subteams to find an agent by name.

        Args:
            member_id (str): ID of the agent to find

        Returns:
            Optional[Tuple[int, Union[Agent, "Team"], Optional[str]]]: Tuple containing:
                - Index of the member in its immediate parent team
                - The top-level leader agent
        """
        # First check direct members
        for i, member in enumerate(self.members):
            if member.name is not None:
                url_safe_member_id = self._get_member_id(member)
                if url_safe_member_id == member_id:
                    return i, member

            # If this member is a team, search its members recursively
            if isinstance(member, Team):
                result = member._find_member_by_id(member_id)
                if result is not None:
                    # Found in subteam, return with the top-level team member's name
                    return i, member

        return None

    def get_forward_task_function(
        self,
        message: Message,
        session_id: str,
        stream: bool = False,
        stream_intermediate_steps: bool = False,
        async_mode: bool = False,
        images: Optional[Sequence[Image]] = None,
        videos: Optional[Sequence[Video]] = None,
        audio: Optional[Sequence[Audio]] = None,
        files: Optional[Sequence[File]] = None,
        knowledge_filters: Optional[Dict[str, Any]] = None,
    ) -> Function:
        if not images:
            images = []
        if not videos:
            videos = []
        if not audio:
            audio = []
        if not files:
            files = []

        def forward_task_to_member(
            member_id: str, expected_output: Optional[str] = None
        ) -> Iterator[Union[RunResponseEvent, TeamRunResponseEvent, str]]:
            """Use this function to forward the request to the selected team member.
            Args:
                member_id (str): The ID of the member to transfer the task to.
                expected_output (str): The expected output from the member (optional).
            Returns:
                str: The result of the delegated task.
            """
            self._member_response_model = None

            # Find the member agent using the helper function
            result = self._find_member_by_id(member_id)
            if result is None:
                yield f"Member with ID {member_id} not found in the team or any subteams. Please choose the correct member from the list of members:\n\n{self.get_members_system_message_content(indent=0)}"
                return

            member_agent_index, member_agent = result
            self._initialize_member(member_agent, session_id=session_id)

            # Since we return the response directly from the member agent, we need to set the response model from the team down.
            if not member_agent.response_model and self.response_model:
                member_agent.response_model = self.response_model

            # If the member will produce structured output, we need to parse the response
            if member_agent.response_model is not None:
                self._member_response_model = member_agent.response_model

            # Make sure for the member agent, we are using the agent logger
            use_agent_logger()

            # If found in subteam, include the path in the task description
            member_agent_task = message.get_content_string()

            if expected_output:
                member_agent_task += (
                    f"\n\n<expected_output>\n{expected_output}\n</expected_output>"
                )

            # Handle enable_agentic_knowledge_filters
            if (
                self.enable_agentic_knowledge_filters
                and not member_agent.enable_agentic_knowledge_filters
            ):
                member_agent.enable_agentic_knowledge_filters = (
                    self.enable_agentic_knowledge_filters
                )

            # 2. Get the response from the member agent
            if stream:
                member_agent_run_response_stream = member_agent.run(
                    member_agent_task,
                    images=images,
                    videos=videos,
                    audio=audio,
                    files=files,
                    stream=True,
                    stream_intermediate_steps=stream_intermediate_steps,
                    knowledge_filters=knowledge_filters
                    if not member_agent.knowledge_filters and member_agent.knowledge
                    else None,
                )
                for member_agent_run_response_chunk in member_agent_run_response_stream:
                    check_if_run_cancelled(member_agent_run_response_chunk)
                    yield member_agent_run_response_chunk
            else:
                member_agent_run_response = member_agent.run(
                    member_agent_task,
                    images=images,
                    videos=videos,
                    audio=audio,
                    files=files,
                    stream=False,
                    knowledge_filters=knowledge_filters
                    if not member_agent.knowledge_filters and member_agent.knowledge
                    else None,
                )
                check_if_run_cancelled(member_agent_run_response)

                try:
                    if member_agent_run_response.content is None and (
                        member_agent_run_response.tools is None
                        or len(member_agent_run_response.tools) == 0
                    ):
                        yield "No response from the member agent."
                    elif isinstance(member_agent_run_response.content, str):
                        if len(member_agent_run_response.content.strip()) > 0:
                            yield member_agent_run_response.content

                        # If the content is empty but we have tool calls
                        elif (
                            member_agent_run_response.tools is not None
                            and len(member_agent_run_response.tools) > 0
                        ):
                            tool_str = ""
                            for tool in member_agent_run_response.tools:
                                if tool.result:
                                    tool_str += f"{tool.result},"
                            yield tool_str.rstrip(",")

                    elif issubclass(type(member_agent_run_response.content), BaseModel):
                        yield member_agent_run_response.content.model_dump_json(
                            indent=2
                        )  # type: ignore
                    else:
                        import json

                        yield json.dumps(member_agent_run_response.content, indent=2)
                except Exception as e:
                    yield str(e)

            # Afterward, switch back to the team logger
            use_team_logger()

            # Update the memory
            member_name = (
                member_agent.name
                if member_agent.name
                else f"agent_{member_agent_index}"
            )
            if isinstance(self.memory, TeamMemory):
                self.memory = cast(TeamMemory, self.memory)
                self.memory.add_interaction_to_team_context(
                    member_name=member_name,
                    task=message.get_content_string(),
                    run_response=member_agent.run_response,  # type: ignore
                )
            else:
                self.memory = cast(Memory, self.memory)
                self.memory.add_interaction_to_team_context(
                    session_id=session_id,  # type: ignore
                    member_name=member_name,
                    task=message.get_content_string(),
                    run_response=member_agent.run_response,  # type: ignore
                )

            # Add the member run to the team run response
            self.run_response = cast(TeamRunResponse, self.run_response)
            self.run_response.add_member_run(member_agent.run_response)  # type: ignore

            # Update team session state
            self._update_team_session_state(member_agent)

            # Update the team media
            self._update_team_media(member_agent.run_response)  # type: ignore

        async def aforward_task_to_member(
            member_id: str, expected_output: Optional[str] = None
        ) -> AsyncIterator[Union[RunResponseEvent, TeamRunResponseEvent, str]]:
            """Use this function to forward a message to the selected team member.

            Args:
                member_id (str): The ID of the member to transfer the task to.
                expected_output (str): The expected output from the member (optional).
            Returns:
                str: The result of the delegated task.
            """
            self._member_response_model = None

            # Find the member agent using the helper function
            result = self._find_member_by_id(member_id)
            if result is None:
                yield f"Member with ID {member_id} not found in the team or any subteams. Please choose the correct member from the list of members:\n\n{self.get_members_system_message_content(indent=0)}"
                return

            member_agent_index, member_agent = result
            self._initialize_member(member_agent, session_id=session_id)

            # If the member will produce structured output, we need to parse the response
            if member_agent.response_model is not None:
                self._member_response_model = member_agent.response_model

            # Make sure for the member agent, we are using the agent logger
            use_agent_logger()

            # If found in subteam, include the path in the task description
            member_agent_task = message.get_content_string()

            if expected_output:
                member_agent_task += (
                    f"\n\n<expected_output>\n{expected_output}\n</expected_output>"
                )

            # Handle enable_agentic_knowledge_filters
            if (
                self.enable_agentic_knowledge_filters
                and not member_agent.enable_agentic_knowledge_filters
            ):
                member_agent.enable_agentic_knowledge_filters = (
                    self.enable_agentic_knowledge_filters
                )

            # 2. Get the response from the member agent
            if stream:
                member_agent_run_response_stream = await member_agent.arun(
                    member_agent_task,
                    images=images,
                    videos=videos,
                    audio=audio,
                    files=files,
                    stream=True,
                    stream_intermediate_steps=stream_intermediate_steps,
                    knowledge_filters=knowledge_filters
                    if not member_agent.knowledge_filters and member_agent.knowledge
                    else None,
                )
                async for (
                    member_agent_run_response_event
                ) in member_agent_run_response_stream:
                    check_if_run_cancelled(member_agent_run_response_event)
                    yield member_agent_run_response_event
            else:
                member_agent_run_response = await member_agent.arun(
                    member_agent_task,
                    images=images,
                    videos=videos,
                    audio=audio,
                    files=files,
                    stream=False,
                    knowledge_filters=knowledge_filters
                    if (member_agent.knowledge_filters and member_agent.knowledge)
                    else None,
                )

                try:
                    if member_agent_run_response.content is None and (
                        member_agent_run_response.tools is None
                        or len(member_agent_run_response.tools) == 0
                    ):
                        yield "No response from the member agent."
                    elif isinstance(member_agent_run_response.content, str):
                        if len(member_agent_run_response.content.strip()) > 0:
                            yield member_agent_run_response.content

                        # If the content is empty but we have tool calls
                        elif (
                            member_agent_run_response.tools is not None
                            and len(member_agent_run_response.tools) > 0
                        ):
                            yield ",".join(
                                [
                                    tool.result
                                    for tool in member_agent_run_response.tools
                                    if tool.result
                                ]
                            )  # type: ignore
                    elif issubclass(type(member_agent_run_response.content), BaseModel):
                        yield member_agent_run_response.content.model_dump_json(
                            indent=2
                        )  # type: ignore
                    else:
                        import json

                        yield json.dumps(member_agent_run_response.content, indent=2)
                except Exception as e:
                    yield str(e)

            # Afterward, switch back to the team logger
            use_team_logger()

            # Update the memory
            member_name = (
                member_agent.name
                if member_agent.name
                else f"agent_{member_agent_index}"
            )
            if isinstance(self.memory, TeamMemory):
                self.memory = cast(TeamMemory, self.memory)
                self.memory.add_interaction_to_team_context(
                    member_name=member_name,
                    task=message.get_content_string(),
                    run_response=member_agent.run_response,  # type: ignore
                )
            else:
                self.memory = cast(Memory, self.memory)
                self.memory.add_interaction_to_team_context(
                    session_id=session_id,  # type: ignore
                    member_name=member_name,
                    task=message.get_content_string(),
                    run_response=member_agent.run_response,  # type: ignore
                )

            # Add the member run to the team run response
            self.run_response = cast(TeamRunResponse, self.run_response)
            self.run_response.add_member_run(member_agent.run_response)  # type: ignore

            # Update team session state
            self._update_team_session_state(member_agent)

            # Update the team media
            self._update_team_media(member_agent.run_response)  # type: ignore

        if async_mode:
            forward_function = aforward_task_to_member  # type: ignore
        else:
            forward_function = forward_task_to_member  # type: ignore

        forward_func = Function.from_callable(forward_function, strict=True)

        forward_func.stop_after_tool_call = True
        forward_func.show_result = True

        return forward_func

    ###########################################################################
    # Storage
    ###########################################################################

    def read_from_storage(self, session_id: str) -> Optional[TeamSession]:
        """Load the TeamSession from storage

        Returns:
            Optional[TeamSession]: The loaded TeamSession or None if not found.
        """
        if self.storage is not None and session_id is not None:
            self.team_session = cast(
                TeamSession, self.storage.read(session_id=session_id)
            )
            if self.team_session is not None:
                self.load_team_session(session=self.team_session)
            else:
                # New session, just reset the state
                self.session_name = None
        return self.team_session

    def write_to_storage(
        self, session_id: str, user_id: Optional[str] = None
    ) -> Optional[TeamSession]:
        """Save the TeamSession to storage

        Returns:
            Optional[TeamSession]: The saved TeamSession or None if not saved.
        """
        if self.storage is not None:
            self.team_session = cast(
                TeamSession,
                self.storage.upsert(
                    session=self._get_team_session(
                        session_id=session_id, user_id=user_id
                    )
                ),
            )
        return self.team_session

    def rename_session(
        self, session_name: str, session_id: Optional[str] = None
    ) -> None:
        """Rename the current session and save to storage"""
        if self.session_id is None and session_id is None:
            raise ValueError("Session ID is not initialized")

        session_id = session_id or self.session_id

        # -*- Read from storage
        self.read_from_storage(session_id=session_id)  # type: ignore
        # -*- Rename session
        self.session_name = session_name
        # -*- Save to storage
        self.write_to_storage(session_id=session_id, user_id=self.user_id)  # type: ignore
        # -*- Log Agent session
        self._log_team_session(session_id=session_id, user_id=self.user_id)  # type: ignore

    def delete_session(self, session_id: str) -> None:
        """Delete the current session and save to storage"""
        if self.storage is not None:
            self.storage.delete_session(session_id=session_id)

    def load_team_session(self, session: TeamSession):
        """Load the existing TeamSession from an TeamSession (from the database)"""
        from agno.utils.merge_dict import merge_dictionaries

        # Get the team_id, user_id and session_id from the database
        if self.team_id is None and session.team_id is not None:
            self.team_id = session.team_id
        if self.user_id is None and session.user_id is not None:
            self.user_id = session.user_id
        if self.session_id is None and session.session_id is not None:
            self.session_id = session.session_id

        # Read team_data from the database
        if session.team_data is not None:
            # Get name from database and update the team name if not set
            if self.name is None and "name" in session.team_data:
                self.name = session.team_data.get("name")

        # Read session_data from the database
        if session.session_data is not None:
            # Get the session_name from database and update the current session_name if not set
            if self.session_name is None and "session_name" in session.session_data:
                self.session_name = session.session_data.get("session_name")

            # Get the session_state from the database and update the current session_state
            if "session_state" in session.session_data:
                session_state_from_db = session.session_data.get("session_state")
                if (
                    session_state_from_db is not None
                    and isinstance(session_state_from_db, dict)
                    and len(session_state_from_db) > 0
                ):
                    # If the session_state is already set, merge the session_state from the database with the current session_state
                    if self.session_state is not None and len(self.session_state) > 0:
                        # This updates session_state_from_db
                        merge_dictionaries(session_state_from_db, self.session_state)
                    # Update the current session_state
                    self.session_state = session_state_from_db

            if "team_session_state" in session.session_data:
                team_session_state_from_db = session.session_data.get(
                    "team_session_state"
                )
                if (
                    team_session_state_from_db is not None
                    and isinstance(team_session_state_from_db, dict)
                    and len(team_session_state_from_db) > 0
                ):
                    # If the team_session_state is already set, merge the team_session_state from the database with the current team_session_state
                    if (
                        self.team_session_state is not None
                        and len(self.team_session_state) > 0
                    ):
                        # This updates team_session_state_from_db
                        merge_dictionaries(
                            team_session_state_from_db, self.team_session_state
                        )
                    # Update the current team_session_state
                    self.team_session_state = team_session_state_from_db

            # Get the session_metrics from the database
            if "session_metrics" in session.session_data:
                session_metrics_from_db = session.session_data.get("session_metrics")
                if session_metrics_from_db is not None and isinstance(
                    session_metrics_from_db, dict
                ):
                    self.session_metrics = SessionMetrics(**session_metrics_from_db)

            # Get images, videos, and audios from the database
            if "images" in session.session_data:
                images_from_db = session.session_data.get("images")
                if images_from_db is not None and isinstance(images_from_db, list):
                    if self.images is None:
                        self.images = []
                    self.images.extend(
                        [ImageArtifact.model_validate(img) for img in images_from_db]
                    )
            if "videos" in session.session_data:
                videos_from_db = session.session_data.get("videos")
                if videos_from_db is not None and isinstance(videos_from_db, list):
                    if self.videos is None:
                        self.videos = []
                    self.videos.extend(
                        [VideoArtifact.model_validate(vid) for vid in videos_from_db]
                    )
            if "audio" in session.session_data:
                audio_from_db = session.session_data.get("audio")
                if audio_from_db is not None and isinstance(audio_from_db, list):
                    if self.audio is None:
                        self.audio = []
                    self.audio.extend(
                        [AudioArtifact.model_validate(aud) for aud in audio_from_db]
                    )

        # Read extra_data from the database
        if session.extra_data is not None:
            # If extra_data is set in the agent, update the database extra_data with the agent's extra_data
            if self.extra_data is not None:
                # Updates agent_session.extra_data in place
                merge_dictionaries(session.extra_data, self.extra_data)
            # Update the current extra_data with the extra_data from the database which is updated in place
            self.extra_data = session.extra_data

        if self.memory is None:
            self.memory = session.memory  # type: ignore

        if not (isinstance(self.memory, TeamMemory) or isinstance(self.memory, Memory)):
            # Is it a dict of `TeamMemory`?
            if isinstance(self.memory, dict) and "create_user_memories" in self.memory:
                # Convert dict to TeamMemory
                self.memory = TeamMemory(**self.memory)
            else:
                # Default to base memory
                self.memory = Memory()

        if session.memory is not None:
            if isinstance(self.memory, TeamMemory):
                try:
                    if "runs" in session.memory:
                        try:
                            self.memory.runs = [
                                TeamRun.from_dict(m) for m in session.memory["runs"]
                            ]
                        except Exception as e:
                            log_warning(f"Failed to load runs from memory: {e}")
                    if "messages" in session.memory:
                        try:
                            self.memory.messages = [
                                Message.model_validate(m)
                                for m in session.memory["messages"]
                            ]
                        except Exception as e:
                            log_warning(f"Failed to load messages from memory: {e}")
                    if "memories" in session.memory:
                        from agno.memory.memory import Memory as UserMemoryV1

                        try:
                            self.memory.memories = [
                                UserMemoryV1.model_validate(m)
                                for m in session.memory["memories"]
                            ]
                        except Exception as e:
                            log_warning(f"Failed to load user memories: {e}")

                    if self.memory.create_user_memories:
                        if self.user_id is not None and self.memory.user_id is None:
                            self.memory.user_id = self.user_id

                        self.memory.load_user_memories()
                        if self.user_id is not None:
                            log_debug(f"Memories loaded for user: {self.user_id}")
                        else:
                            log_debug("Memories loaded")

                except Exception as e:
                    log_warning(f"Failed to load TeamMemory: {e}")
            elif isinstance(self.memory, Memory):
                if "runs" in session.memory:
                    try:
                        if self.memory.runs is None:
                            self.memory.runs = {}
                        self.memory.runs[session.session_id] = []
                        for run in session.memory["runs"]:
                            run_session_id = run["session_id"]
                            if "team_id" in run:
                                self.memory.runs[run_session_id].append(
                                    TeamRunResponse.from_dict(run)
                                )
                            else:
                                self.memory.runs[run_session_id].append(
                                    RunResponse.from_dict(run)
                                )
                    except Exception as e:
                        log_warning(f"Failed to load runs from memory: {e}")
                if "team_context" in session.memory:
                    from agno.memory.v2.memory import TeamContext

                    try:
                        self.memory.team_context = {
                            session_id: TeamContext.from_dict(team_context)
                            for session_id, team_context in session.memory[
                                "team_context"
                            ].items()
                        }
                    except Exception as e:
                        log_warning(f"Failed to load team context: {e}")
                if "memories" in session.memory:
                    if self.memory.memories is not None:
                        pass
                    else:
                        from agno.memory.v2.memory import UserMemory as UserMemoryV2

                        try:
                            self.memory.memories = {
                                user_id: {
                                    memory_id: UserMemoryV2.from_dict(memory)
                                    for memory_id, memory in user_memories.items()
                                }
                                for user_id, user_memories in session.memory[
                                    "memories"
                                ].items()
                            }
                        except Exception as e:
                            log_warning(f"Failed to load user memories: {e}")
                if "summaries" in session.memory:
                    if self.memory.summaries is not None:
                        pass
                    else:
                        from agno.memory.v2.memory import (
                            SessionSummary as SessionSummaryV2,
                        )

                        try:
                            self.memory.summaries = {
                                user_id: {
                                    session_id: SessionSummaryV2.from_dict(summary)
                                    for session_id, summary in user_session_summaries.items()
                                }
                                for user_id, user_session_summaries in session.memory[
                                    "summaries"
                                ].items()
                            }
                        except Exception as e:
                            log_warning(f"Failed to load session summaries: {e}")
        log_debug(f"-*- TeamSession loaded: {session.session_id}")

    def load_session(self, force: bool = False) -> Optional[str]:
        """Load an existing session from the database and return the session_id.
        If a session does not exist, create a new session.

        - If a session exists in the database, load the session.
        - If a session does not exist in the database, create a new session.
        """
        # If a team_session is already loaded, return the session_id from the team_session
        #   if the session_id matches the session_id from the team_session
        if self.team_session is not None and not force:
            if (
                self.session_id is not None
                and self.team_session.session_id == self.session_id
            ):
                return self.team_session.session_id

        # Load an existing session or create a new session
        if self.storage is not None:
            # Load existing session if session_id is provided
            log_debug(f"Reading TeamSession: {self.session_id}")
            self.read_from_storage(session_id=self.session_id)  # type: ignore

            # Create a new session if it does not exist
            if self.team_session is None:
                log_debug("-*- Creating new TeamSession")
                if self.session_id is None or self.session_id == "":
                    self.session_id = str(uuid4())
                if self.team_id is None:
                    self.initialize_team(session_id=self.session_id)
                # write_to_storage() will create a new TeamSession
                # and populate self.team_session with the new session
                self.write_to_storage(session_id=self.session_id, user_id=self.user_id)  # type: ignore
                if self.team_session is None:
                    raise Exception("Failed to create new TeamSession in storage")
                log_debug(f"-*- Created TeamSession: {self.team_session.session_id}")
                self._log_team_session(session_id=self.session_id, user_id=self.user_id)  # type: ignore
        return self.session_id

    def get_messages_for_session(
        self, session_id: Optional[str] = None, user_id: Optional[str] = None
    ) -> List[Message]:
        """Get messages for a session"""
        _session_id = session_id or self.session_id
        _user_id = user_id or self.user_id
        if _session_id is None:
            log_warning("Session ID is not set, cannot get messages for session")
            return []

        if self.memory is None:
            self.read_from_storage(session_id=_session_id)

        if self.memory is None:
            return []

        if isinstance(self.memory, AgentMemory):
            return self.memory.messages
        elif isinstance(self.memory, Memory):
            return self.memory.get_messages_from_last_n_runs(session_id=_session_id)
        else:
            return []

    def get_session_summary(
        self, session_id: Optional[str] = None, user_id: Optional[str] = None
    ):
        """Get the session summary for the given session ID and user ID."""
        if self.memory is None:
            return None

        session_id = session_id if session_id is not None else self.session_id
        if session_id is None:
            raise ValueError("Session ID is required")

        if isinstance(self.memory, Memory):
            user_id = user_id if user_id is not None else self.user_id
            if user_id is None:
                user_id = "default"
            return self.memory.get_session_summary(
                session_id=session_id, user_id=user_id
            )
        elif isinstance(self.memory, TeamMemory):
            raise ValueError("TeamMemory does not support get_session_summary")
        else:
            raise ValueError(f"Memory type {type(self.memory)} not supported")

    def get_user_memories(self, user_id: Optional[str] = None):
        """Get the user memories for the given user ID."""
        if self.memory is None:
            return None
        user_id = user_id if user_id is not None else self.user_id
        if user_id is None:
            user_id = "default"

        if isinstance(self.memory, Memory):
            return self.memory.get_user_memories(user_id=user_id)
        elif isinstance(self.memory, TeamMemory):
            raise ValueError("TeamMemory does not support get_user_memories")
        else:
            raise ValueError(f"Memory type {type(self.memory)} not supported")

    ###########################################################################
    # Handle images, videos and audio
    ###########################################################################

    def add_image(self, image: ImageArtifact) -> None:
        if self.images is None:
            self.images = []
        self.images.append(image)
        if self.run_response is not None:
            if self.run_response.images is None:
                self.run_response.images = []
            self.run_response.images.append(image)

    def add_video(self, video: VideoArtifact) -> None:
        if self.videos is None:
            self.videos = []
        self.videos.append(video)
        if self.run_response is not None:
            if self.run_response.videos is None:
                self.run_response.videos = []
            self.run_response.videos.append(video)

    def add_audio(self, audio: AudioArtifact) -> None:
        if self.audio is None:
            self.audio = []
        self.audio.append(audio)
        if self.run_response is not None:
            if self.run_response.audio is None:
                self.run_response.audio = []
            self.run_response.audio.append(audio)

    def get_images(self) -> Optional[List[ImageArtifact]]:
        return self.images

    def get_videos(self) -> Optional[List[VideoArtifact]]:
        return self.videos

    def get_audio(self) -> Optional[List[AudioArtifact]]:
        return self.audio

    def update_reasoning_content_from_tool_call(
        self, run_response: TeamRunResponse, tool_name: str, tool_args: Dict[str, Any]
    ) -> Optional[ReasoningStep]:
        """Update reasoning_content based on tool calls that look like thinking or reasoning tools."""

        # Case 1: ReasoningTools.think (has title, thought, optional action and confidence)
        if (
            tool_name.lower() == "think"
            and "title" in tool_args
            and "thought" in tool_args
        ):
            title = tool_args["title"]
            thought = tool_args["thought"]
            action = tool_args.get("action", "")
            confidence = tool_args.get("confidence", None)

            # Create a reasoning step
            reasoning_step = ReasoningStep(
                title=title,
                reasoning=thought,
                action=action,
                next_action=NextAction.CONTINUE,
                confidence=confidence,
            )

            # Add the step to the run response
            self._add_reasoning_step_to_extra_data(run_response, reasoning_step)

            formatted_content = f"## {title}\n{thought}\n"
            if action:
                formatted_content += f"Action: {action}\n"
            if confidence is not None:
                formatted_content += f"Confidence: {confidence}\n"
            formatted_content += "\n"

            self._append_to_reasoning_content(run_response, formatted_content)
            return reasoning_step

        # Case 2: ReasoningTools.analyze (has title, result, analysis, optional next_action and confidence)
        elif tool_name.lower() == "analyze" and "title" in tool_args:
            title = tool_args["title"]
            result = tool_args.get("result", "")
            analysis = tool_args.get("analysis", "")
            next_action = tool_args.get("next_action", "")
            confidence = tool_args.get("confidence", None)

            # Map string next_action to enum
            next_action_enum = NextAction.CONTINUE
            if next_action.lower() == "validate":
                next_action_enum = NextAction.VALIDATE
            elif next_action.lower() in ["final", "final_answer", "finalize"]:
                next_action_enum = NextAction.FINAL_ANSWER

            # Create a reasoning step
            reasoning_step = ReasoningStep(
                title=title,
                result=result,
                reasoning=analysis,
                next_action=next_action_enum,
                confidence=confidence,
            )

            # Add the step to the run response
            self._add_reasoning_step_to_extra_data(run_response, reasoning_step)

            formatted_content = f"## {title}\n"
            if result:
                formatted_content += f"Result: {result}\n"
            if analysis:
                formatted_content += f"{analysis}\n"
            if next_action and next_action.lower() != "continue":
                formatted_content += f"Next Action: {next_action}\n"
            if confidence is not None:
                formatted_content += f"Confidence: {confidence}\n"
            formatted_content += "\n"

            self._append_to_reasoning_content(run_response, formatted_content)
            return reasoning_step

        # Case 3: ThinkingTools.think (simple format, just has 'thought')
        elif tool_name.lower() == "think" and "thought" in tool_args:
            thought = tool_args["thought"]
            reasoning_step = ReasoningStep(
                title="Thinking",
                reasoning=thought,
                confidence=None,
            )
            formatted_content = f"## Thinking\n{thought}\n\n"
            self._add_reasoning_step_to_extra_data(run_response, reasoning_step)
            self._append_to_reasoning_content(run_response, formatted_content)
            return reasoning_step

        return None

    def _append_to_reasoning_content(
        self, run_response: TeamRunResponse, content: str
    ) -> None:
        """Helper to append content to the reasoning_content field."""
        if (
            not hasattr(run_response, "reasoning_content")
            or not run_response.reasoning_content
        ):  # type: ignore
            run_response.reasoning_content = content  # type: ignore
        else:
            run_response.reasoning_content += content  # type: ignore

    def _add_reasoning_step_to_extra_data(
        self, run_response: TeamRunResponse, reasoning_step: ReasoningStep
    ) -> None:
        if run_response.extra_data is None:
            from agno.run.response import RunResponseExtraData

            run_response.extra_data = RunResponseExtraData()

        if run_response.extra_data.reasoning_steps is None:
            run_response.extra_data.reasoning_steps = []

        run_response.extra_data.reasoning_steps.append(reasoning_step)

    def _add_reasoning_metrics_to_extra_data(
        self, run_response: TeamRunResponse, reasoning_time_taken: float
    ) -> None:
        try:
            if run_response.extra_data is None:
                from agno.run.response import RunResponseExtraData

                run_response.extra_data = RunResponseExtraData()

            # Initialize reasoning_messages if it doesn't exist
            if run_response.extra_data.reasoning_messages is None:
                run_response.extra_data.reasoning_messages = []

            metrics_message = Message(
                role="assistant",
                content=run_response.reasoning_content,
                metrics={"time": reasoning_time_taken},
            )

            # Add the metrics message to the reasoning_messages
            run_response.extra_data.reasoning_messages.append(metrics_message)
        except Exception as e:
            log_error(f"Failed to add reasoning metrics to extra_data: {str(e)}")

    ###########################################################################
    # Knowledge
    ###########################################################################

    def get_relevant_docs_from_knowledge(
        self,
        query: str,
        num_documents: Optional[int] = None,
        filters: Optional[Dict[str, Any]] = None,
        **kwargs,
    ) -> Optional[List[Union[Dict[str, Any], str]]]:
        """Return a list of references from the knowledge base"""
        from agno.document import Document

        # Validate the filters against known valid filter keys
        if self.knowledge is not None:
            valid_filters, invalid_keys = self.knowledge.validate_filters(filters)  # type: ignore

            # Warn about invalid filter keys
            if invalid_keys:
                # type: ignore
                log_warning(
                    f"Invalid filter keys provided: {invalid_keys}. These filters will be ignored."
                )
                log_info(
                    f"Valid filter keys are: {self.knowledge.valid_metadata_filters}"
                )  # type: ignore

                # Only use valid filters
                filters = valid_filters
                if not filters:
                    log_warning(
                        "No valid filters remain after validation. Search will proceed without filters."
                    )

        if self.retriever is not None and callable(self.retriever):
            from inspect import signature

            try:
                sig = signature(self.retriever)
                retriever_kwargs: Dict[str, Any] = {}
                if "team" in sig.parameters:
                    retriever_kwargs = {"team": self}
                if "filters" in sig.parameters:
                    retriever_kwargs["filters"] = filters
                retriever_kwargs.update(
                    {"query": query, "num_documents": num_documents, **kwargs}
                )
                return self.retriever(**retriever_kwargs)
            except Exception as e:
                log_warning(f"Retriever failed: {e}")
                raise e
        try:
            if self.knowledge is None or self.knowledge.vector_db is None:
                return None

            if num_documents is None:
                num_documents = self.knowledge.num_documents

            log_debug(f"Searching knowledge base with filters: {filters}")
            relevant_docs: List[Document] = self.knowledge.search(
                query=query, num_documents=num_documents, filters=filters
            )

            if not relevant_docs or len(relevant_docs) == 0:
                log_debug("No relevant documents found for query")
                return None

            return [doc.to_dict() for doc in relevant_docs]
        except Exception as e:
            log_warning(f"Error searching knowledge base: {e}")
            raise e

    async def aget_relevant_docs_from_knowledge(
        self,
        query: str,
        num_documents: Optional[int] = None,
        filters: Optional[Dict[str, Any]] = None,
        **kwargs,
    ) -> Optional[List[Union[Dict[str, Any], str]]]:
        """Get relevant documents from knowledge base asynchronously."""
        from agno.document import Document

        # Validate the filters against known valid filter keys
        if self.knowledge is not None:
            valid_filters, invalid_keys = self.knowledge.validate_filters(filters)  # type: ignore

            # Warn about invalid filter keys
            if invalid_keys:
                # type: ignore
                log_warning(
                    f"Invalid filter keys provided: {invalid_keys}. These filters will be ignored."
                )
                # type: ignore
                log_info(
                    f"Valid filter keys are: {self.knowledge.valid_metadata_filters}"
                )

                # Only use valid filters
                filters = valid_filters
                if not filters:
                    log_warning(
                        "No valid filters remain after validation. Search will proceed without filters."
                    )

        if self.retriever is not None and callable(self.retriever):
            from inspect import signature

            try:
                sig = signature(self.retriever)
                retriever_kwargs: Dict[str, Any] = {}
                if "team" in sig.parameters:
                    retriever_kwargs = {"team": self}
                if "filters" in sig.parameters:
                    retriever_kwargs["filters"] = filters
                retriever_kwargs.update(
                    {"query": query, "num_documents": num_documents, **kwargs}
                )
                return self.retriever(**retriever_kwargs)
            except Exception as e:
                log_warning(f"Retriever failed: {e}")
                raise e

        try:
            if self.knowledge is None or self.knowledge.vector_db is None:
                return None

            if num_documents is None:
                num_documents = self.knowledge.num_documents

            log_debug(f"Searching knowledge base with filters: {filters}")
            relevant_docs: List[Document] = await self.knowledge.async_search(
                query=query, num_documents=num_documents, filters=filters
            )

            if not relevant_docs or len(relevant_docs) == 0:
                log_debug("No relevant documents found for query")
                return None

            return [doc.to_dict() for doc in relevant_docs]
        except Exception as e:
            log_warning(f"Error searching knowledge base: {e}")
            raise e

    def _convert_documents_to_string(
        self, docs: List[Union[Dict[str, Any], str]]
    ) -> str:
        if docs is None or len(docs) == 0:
            return ""

        if self.references_format == "yaml":
            import yaml

            return yaml.dump(docs)

        import json

        return json.dumps(docs, indent=2)

    def _get_team_effective_filters(
        self, knowledge_filters: Optional[Dict[str, Any]] = None
    ) -> Optional[Dict[str, Any]]:
        """
        Determine effective filters for the team, considering:
        1. Team-level filters (self.knowledge_filters)
        2. Run-time filters (knowledge_filters)

        Priority: Run-time filters > Team filters
        """
        effective_filters = None

        # Start with team-level filters if they exist
        if self.knowledge_filters:
            effective_filters = self.knowledge_filters.copy()

        # Apply run-time filters if they exist
        if knowledge_filters:
            if effective_filters:
                effective_filters.update(knowledge_filters)
            else:
                effective_filters = knowledge_filters

        return effective_filters

    def search_knowledge_base_function(
        self,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        async_mode: bool = False,
    ) -> Callable:
        """Factory function to create a search_knowledge_base function with filters."""

        def search_knowledge_base(query: str) -> str:
            """Use this function to search the knowledge base for information about a query.

            Args:
                query: The query to search for.

            Returns:
                str: A string containing the response from the knowledge base.
            """
            # Get the relevant documents from the knowledge base, passing filters
            self.run_response = cast(TeamRunResponse, self.run_response)
            retrieval_timer = Timer()
            retrieval_timer.start()
            docs_from_knowledge = self.get_relevant_docs_from_knowledge(
                query=query, filters=knowledge_filters
            )
            if docs_from_knowledge is not None:
                references = MessageReferences(
                    query=query,
                    references=docs_from_knowledge,
                    time=round(retrieval_timer.elapsed, 4),
                )
                # Add the references to the run_response
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData()
                if self.run_response.extra_data.references is None:
                    self.run_response.extra_data.references = []
                self.run_response.extra_data.references.append(references)
            retrieval_timer.stop()
            log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")

            if docs_from_knowledge is None:
                return "No documents found"
            return self._convert_documents_to_string(docs_from_knowledge)

        async def asearch_knowledge_base(query: str) -> str:
            """Use this function to search the knowledge base for information about a query asynchronously.

            Args:
                query: The query to search for.

            Returns:
                str: A string containing the response from the knowledge base.
            """
            self.run_response = cast(TeamRunResponse, self.run_response)
            retrieval_timer = Timer()
            retrieval_timer.start()
            docs_from_knowledge = await self.aget_relevant_docs_from_knowledge(
                query=query, filters=knowledge_filters
            )
            if docs_from_knowledge is not None:
                references = MessageReferences(
                    query=query,
                    references=docs_from_knowledge,
                    time=round(retrieval_timer.elapsed, 4),
                )
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData()
                if self.run_response.extra_data.references is None:
                    self.run_response.extra_data.references = []
                self.run_response.extra_data.references.append(references)
            retrieval_timer.stop()
            log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")

            if docs_from_knowledge is None:
                return "No documents found"
            return self._convert_documents_to_string(docs_from_knowledge)

        if async_mode:
            return asearch_knowledge_base
        else:
            return search_knowledge_base

    def search_knowledge_base_with_agentic_filters_function(
        self,
        knowledge_filters: Optional[Dict[str, Any]] = None,
        async_mode: bool = False,
    ) -> Callable:
        """Factory function to create a search_knowledge_base function with filters."""

        def search_knowledge_base(
            query: str, filters: Optional[Dict[str, Any]] = None
        ) -> str:
            """Use this function to search the knowledge base for information about a query.

            Args:
                query: The query to search for.
                filters: The filters to apply to the search. This is a dictionary of key-value pairs.

            Returns:
                str: A string containing the response from the knowledge base.
            """
            search_filters = self._get_agentic_or_user_search_filters(
                filters, knowledge_filters
            )

            # Get the relevant documents from the knowledge base, passing filters
            self.run_response = cast(TeamRunResponse, self.run_response)
            retrieval_timer = Timer()
            retrieval_timer.start()
            docs_from_knowledge = self.get_relevant_docs_from_knowledge(
                query=query, filters=search_filters
            )
            if docs_from_knowledge is not None:
                references = MessageReferences(
                    query=query,
                    references=docs_from_knowledge,
                    time=round(retrieval_timer.elapsed, 4),
                )
                # Add the references to the run_response
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData()
                if self.run_response.extra_data.references is None:
                    self.run_response.extra_data.references = []
                self.run_response.extra_data.references.append(references)
            retrieval_timer.stop()
            log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")

            if docs_from_knowledge is None:
                return "No documents found"
            return self._convert_documents_to_string(docs_from_knowledge)

        async def asearch_knowledge_base(
            query: str, filters: Optional[Dict[str, Any]] = None
        ) -> str:
            """Use this function to search the knowledge base for information about a query asynchronously.

            Args:
                query: The query to search for.
                filters: The filters to apply to the search. This is a dictionary of key-value pairs.

            Returns:
                str: A string containing the response from the knowledge base.
            """
            search_filters = self._get_agentic_or_user_search_filters(
                filters, knowledge_filters
            )

            self.run_response = cast(TeamRunResponse, self.run_response)
            retrieval_timer = Timer()
            retrieval_timer.start()
            docs_from_knowledge = await self.aget_relevant_docs_from_knowledge(
                query=query, filters=search_filters
            )
            if docs_from_knowledge is not None:
                references = MessageReferences(
                    query=query,
                    references=docs_from_knowledge,
                    time=round(retrieval_timer.elapsed, 4),
                )
                if self.run_response.extra_data is None:
                    self.run_response.extra_data = RunResponseExtraData()
                if self.run_response.extra_data.references is None:
                    self.run_response.extra_data.references = []
                self.run_response.extra_data.references.append(references)
            retrieval_timer.stop()
            log_debug(f"Time to get references: {retrieval_timer.elapsed:.4f}s")

            if docs_from_knowledge is None:
                return "No documents found"
            return self._convert_documents_to_string(docs_from_knowledge)

        if async_mode:
            return asearch_knowledge_base
        else:
            return search_knowledge_base

    ###########################################################################
    # Logging
    ###########################################################################

    def _create_run_data(self) -> Dict[str, Any]:
        """Create and return the run data dictionary."""
        run_response_format = "text"
        if self.response_model is not None:
            run_response_format = "json"
        elif self.markdown:
            run_response_format = "markdown"

        functions = {}
        if self._functions_for_model:
            functions = {
                f_name: func.to_dict()
                for f_name, func in self._functions_for_model.items()
                if isinstance(func, Function)
            }

        run_data: Dict[str, Any] = {
            "functions": functions,
            "metrics": self.run_response.metrics,  # type: ignore
        }

        if self.monitoring:
            run_data.update(
                {
                    "run_input": self.run_input,
                    "run_response": self.run_response.to_dict(),  # type: ignore
                    "run_response_format": run_response_format,
                }
            )

        return run_data

    def _get_team_data(self) -> Dict[str, Any]:
        team_data: Dict[str, Any] = {}
        if self.name is not None:
            team_data["name"] = self.name
        if self.team_id is not None:
            team_data["team_id"] = self.team_id
        if self.model is not None:
            team_data["model"] = self.model.to_dict()
        if self.mode is not None:
            team_data["mode"] = self.mode
        return team_data

    def _get_session_data(self) -> Dict[str, Any]:
        session_data: Dict[str, Any] = {}
        if self.session_name is not None:
            session_data["session_name"] = self.session_name
        if self.session_state is not None and len(self.session_state) > 0:
            session_data["session_state"] = self.session_state
        if self.team_session_state is not None and len(self.team_session_state) > 0:
            session_data["team_session_state"] = self.team_session_state
        if self.session_metrics is not None:
            session_data["session_metrics"] = (
                asdict(self.session_metrics)
                if self.session_metrics is not None
                else None
            )
        if self.images is not None:
            session_data["images"] = [img.to_dict() for img in self.images]  # type: ignore
        if self.videos is not None:
            session_data["videos"] = [vid.to_dict() for vid in self.videos]  # type: ignore
        if self.audio is not None:
            session_data["audio"] = [aud.to_dict() for aud in self.audio]  # type: ignore
        return session_data

    def _get_team_session(
        self, session_id: str, user_id: Optional[str] = None
    ) -> TeamSession:
        from time import time

        """Get an TeamMemory object, which can be saved to the database"""
        memory_dict = None
        if self.memory is not None:
            if isinstance(self.memory, TeamMemory):
                self.memory = cast(TeamMemory, self.memory)
                memory_dict = self.memory.to_dict()
            else:
                self.memory = cast(Memory, self.memory)
                # We fake the structure on storage, to maintain the interface with the legacy implementation
                if self.memory.runs is not None:
                    memory_dict = self.memory.to_dict()
                    run_responses = self.memory.runs.get(session_id)
                    if run_responses is not None:
                        memory_dict["runs"] = [rr.to_dict() for rr in run_responses]

        return TeamSession(
            session_id=session_id,
            team_id=self.team_id,
            user_id=user_id,
            team_session_id=self.team_session_id,
            memory=memory_dict,
            team_data=self._get_team_data(),
            session_data=self._get_session_data(),
            extra_data=self.extra_data,
            created_at=int(time()),
        )

    def _log_team_run(self, session_id: str, user_id: Optional[str] = None) -> None:
        if not self.telemetry and not self.monitoring:
            return

        from agno.api.team import TeamRunCreate, create_team_run

        try:
            run_data = self._create_run_data()
            team_session: TeamSession = self.team_session or self._get_team_session(
                session_id=session_id, user_id=user_id
            )

            create_team_run(
                run=TeamRunCreate(
                    run_id=self.run_id,  # type: ignore
                    run_data=run_data,
                    team_session_id=team_session.team_session_id,
                    session_id=team_session.session_id,
                    team_data=team_session.to_dict()
                    if self.monitoring
                    else team_session.telemetry_data(),
                ),
                monitor=self.monitoring,
            )
        except Exception as e:
            log_debug(f"Could not create team event: {e}")

    async def _alog_team_run(
        self, session_id: str, user_id: Optional[str] = None
    ) -> None:
        if not self.telemetry and not self.monitoring:
            return

        from agno.api.team import TeamRunCreate, acreate_team_run

        try:
            run_data = self._create_run_data()
            team_session: TeamSession = self.team_session or self._get_team_session(
                session_id=session_id, user_id=user_id
            )

            await acreate_team_run(
                run=TeamRunCreate(
                    run_id=self.run_id,
                    run_data=run_data,
                    session_id=team_session.session_id,
                    team_data=team_session.to_dict()
                    if self.monitoring
                    else team_session.telemetry_data(),
                ),
                monitor=self.monitoring,
            )
        except Exception as e:
            log_debug(f"Could not create team event: {e}")

    def _log_team_session(self, session_id: str, user_id: Optional[str] = None):
        if not (self.telemetry or self.monitoring):
            return

        from agno.api.team import TeamSessionCreate, upsert_team_session

        try:
            team_session: TeamSession = self.team_session or self._get_team_session(
                session_id=session_id, user_id=user_id
            )
            upsert_team_session(
                session=TeamSessionCreate(
                    session_id=team_session.session_id,
                    team_data=team_session.to_dict()
                    if self.monitoring
                    else team_session.telemetry_data(),
                ),
                monitor=self.monitoring,
            )
        except Exception as e:
            log_debug(f"Could not create team monitor: {e}")

    def _get_agentic_or_user_search_filters(
        self,
        filters: Optional[Dict[str, Any]],
        effective_filters: Optional[Dict[str, Any]],
    ) -> Dict[str, Any]:
        """Helper function to determine the final filters to use for the search.

        Args:
            filters: Filters passed by the agent.
            effective_filters: Filters passed by user.

        Returns:
            Dict[str, Any]: The final filters to use for the search.
        """
        search_filters = {}

        # If agentic filters exist and manual filters (passed by user) do not, use agentic filters
        if filters and not effective_filters:
            search_filters = filters

        # If both agentic filters exist and manual filters (passed by user) exist, use manual filters (give priority to user and override)
        if filters and effective_filters:
            search_filters = effective_filters

        log_info(f"Filters used by Agent: {search_filters}")
        return search_filters

    def register_team(self) -> None:
        self._set_monitoring()
        if not self.monitoring:
            return

        from agno.api.team import TeamCreate, create_team

        try:
            create_team(
                team=TeamCreate(
                    team_id=self.team_id,
                    name=self.name,
                    config=self.to_platform_dict(),
                    parent_team_id=self.parent_team_id,
                    app_id=self.app_id,
                    workflow_id=self.workflow_id,
                ),
            )

        except Exception as e:
            log_debug(f"Could not create team on platform: {e}")
            print(f"Could not create team on platform: {e}")

    async def _aregister_team(self) -> None:
        self._set_monitoring()
        if not self.monitoring:
            return

        from agno.api.team import TeamCreate, acreate_team

        try:
            await acreate_team(
                team=TeamCreate(
                    team_id=self.team_id,
                    name=self.name,
                    config=self.to_platform_dict(),
                    parent_team_id=self.parent_team_id,
                    app_id=self.app_id,
                    workflow_id=self.workflow_id,
                ),
            )
        except Exception as e:
            print(f"Could not create team on platform: {e}")
            log_debug(f"Could not create team on platform: {e}")

    def to_platform_dict(self) -> Dict[str, Any]:
        model = None
        if self.model is not None:
            model = {
                "name": self.model.__class__.__name__,
                "model": self.model.id,
                "provider": self.model.provider,
            }
        tools: List[Dict[str, Any]] = []
        if self.tools is not None:
            if not hasattr(self, "_tools_for_model") or self._tools_for_model is None:
                team_model = self.model
                if team_model is not None:
                    self.session_id = cast(str, self.session_id)
                    self.determine_tools_for_model(
                        model=team_model, session_id=self.session_id
                    )

            if self._tools_for_model is not None:
                for tool in self._tools_for_model:
                    if isinstance(tool, dict) and tool.get("type") == "function":
                        tools.append(tool["function"])
        payload = {
            "members": [
                {
                    **(
                        member.get_agent_config_dict()
                        if isinstance(member, Agent)
                        else member.to_platform_dict()
                        if isinstance(member, Team)
                        else {}
                    ),
                    "agent_id": member.agent_id
                    if hasattr(member, "agent_id")
                    else None,
                    "team_id": member.team_id if hasattr(member, "team_id") else None,
                    "members": (
                        [
                            {
                                **(
                                    sub_member.get_agent_config_dict()
                                    if isinstance(sub_member, Agent)
                                    else sub_member.to_platform_dict()
                                    if isinstance(sub_member, Team)
                                    else {}
                                ),
                                "agent_id": sub_member.agent_id
                                if hasattr(sub_member, "agent_id")
                                else None,
                                "team_id": sub_member.team_id
                                if hasattr(sub_member, "team_id")
                                else None,
                            }
                            for sub_member in member.members
                            if sub_member is not None
                        ]
                        if isinstance(member, Team) and hasattr(member, "members")
                        else []
                    ),
                }
                for member in self.members
                if member is not None
            ],
            "mode": self.mode,
            "model": model,
            "tools": tools,
            "name": self.name,
            "instructions": self.instructions,
            "description": self.description,
            "storage": {
                "name": self.storage.__class__.__name__,
            }
            if self.storage is not None
            else None,
            # "tools": [tool.to_dict() for tool in self.tools] if self.tools is not None else None,
            "memory": (
                {
                    "name": self.memory.__class__.__name__,
                    "model": (
                        {
                            "name": self.memory.model.__class__.__name__,
                            "model": self.memory.model.id,
                            "provider": self.memory.model.provider,
                        }
                        if hasattr(self.memory, "model")
                        and self.memory.model is not None
                        else (
                            {
                                "name": self.model.__class__.__name__,
                                "model": self.model.id,
                                "provider": self.model.provider,
                            }
                            if self.model is not None
                            else None
                        )
                    ),
                    "db": (
                        {
                            "name": self.memory.db.__class__.__name__,
                            "table_name": self.memory.db.table_name
                            if hasattr(self.memory.db, "table_name")
                            else None,
                            "db_url": self.memory.db.db_url
                            if hasattr(self.memory.db, "db_url")
                            else None,
                        }
                        if hasattr(self.memory, "db") and self.memory.db is not None
                        else None
                    ),
                }
                if self.memory is not None
                and hasattr(self.memory, "db")
                and self.memory.db is not None
                else None
            ),
        }
        payload = {k: v for k, v in payload.items() if v is not None}
        return payload
